Refactor exporter - step 3 (#1083)

* switch console exporter to SimpleExportActivityProcessor

* use synchronization in SimpleExportActivityProcessor
This commit is contained in:
Reiley Yang 2020-08-14 21:31:54 -07:00 committed by GitHub
parent a4237cf104
commit 20bf5c924e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 106 additions and 61 deletions

View File

@ -27,10 +27,10 @@ using OpenTelemetry.Trace;
namespace OpenTelemetry.Exporter.Console
{
public class ConsoleExporter : ActivityExporter
public class ConsoleExporter : ActivityExporterSync
{
private readonly JsonSerializerOptions serializerOptions;
private bool displayAsJson;
private readonly bool displayAsJson;
public ConsoleExporter(ConsoleExporterOptions options)
{
@ -46,9 +46,9 @@ namespace OpenTelemetry.Exporter.Console
this.serializerOptions.Converters.Add(new ActivityTraceIdConverter());
}
public override Task<ExportResult> ExportAsync(IEnumerable<Activity> activityBatch, CancellationToken cancellationToken)
public override ExportResultSync Export(IEnumerable<Activity> batch)
{
foreach (var activity in activityBatch)
foreach (var activity in batch)
{
if (this.displayAsJson)
{
@ -127,12 +127,7 @@ namespace OpenTelemetry.Exporter.Console
}
}
return Task.FromResult(ExportResult.Success);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
return ExportResultSync.Success;
}
}
}

View File

@ -36,7 +36,7 @@ namespace OpenTelemetry.Trace
var options = new ConsoleExporterOptions();
configure?.Invoke(options);
return builder.AddProcessor(new SimpleActivityProcessor(new ConsoleExporter(options)));
return builder.AddProcessor(new SimpleExportActivityProcessor(new ConsoleExporter(options)));
}
}
}

View File

@ -46,7 +46,7 @@ namespace OpenTelemetry.Trace
/// </summary>
/// <param name="batch">Batch of activities to export.</param>
/// <returns>Result of export.</returns>
public abstract ExportResult Export(IEnumerable<Activity> batch);
public abstract ExportResultSync Export(IEnumerable<Activity> batch);
/// <summary>
/// Shuts down the exporter.

View File

@ -0,0 +1,93 @@
// <copyright file="ReentrantExportActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Trace
{
/// <summary>
/// Implements activity processor that exports <see cref="Activity"/> at each OnEnd call without synchronization.
/// </summary>
public class ReentrantExportActivityProcessor : ActivityProcessor
{
private readonly ActivityExporterSync exporter;
private bool stopped;
/// <summary>
/// Initializes a new instance of the <see cref="ReentrantExportActivityProcessor"/> class.
/// </summary>
/// <param name="exporter">Activity exporter instance.</param>
public ReentrantExportActivityProcessor(ActivityExporterSync exporter)
{
this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
}
/// <inheritdoc />
public override void OnEnd(Activity activity)
{
try
{
// TODO: avoid heap allocation
_ = this.exporter.Export(new[] { activity });
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnEnd), ex);
}
}
/// <inheritdoc />
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
if (!this.stopped)
{
this.exporter.Shutdown();
this.stopped = true;
}
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
/// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
try
{
this.exporter.Dispose();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
}
}
}
}

View File

@ -23,70 +23,27 @@ using OpenTelemetry.Internal;
namespace OpenTelemetry.Trace
{
/// <summary>
/// Implements simple activity processor that exports activities in OnEnd call without batching.
/// Implements activity processor that exports <see cref="Activity"/> at each OnEnd call.
/// </summary>
public class SimpleExportActivityProcessor : ActivityProcessor
public class SimpleExportActivityProcessor : ReentrantExportActivityProcessor
{
private readonly ActivityExporterSync exporter;
private bool stopped;
private readonly object lck = new object();
/// <summary>
/// Initializes a new instance of the <see cref="SimpleExportActivityProcessor"/> class.
/// </summary>
/// <param name="exporter">Activity exporter instance.</param>
public SimpleExportActivityProcessor(ActivityExporterSync exporter)
: base(exporter)
{
this.exporter = exporter ?? throw new ArgumentNullException(nameof(exporter));
}
/// <inheritdoc />
public override void OnEnd(Activity activity)
{
try
lock (this.lck)
{
// TODO: avoid heap allocation
_ = this.exporter.Export(new[] { activity });
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.OnEnd), ex);
}
}
/// <inheritdoc />
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
if (!this.stopped)
{
this.exporter.Shutdown();
this.stopped = true;
}
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
/// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing)
{
try
{
this.exporter.Dispose();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
base.OnEnd(activity);
}
}
}