Refactor exporter - step 10 (#1135)

* make Shutdown/Flush sync

* calculate remaining time

* fix doc

* update changelog

* s/timeoutMillis/timeoutMilliseconds/g

Co-authored-by: Mikel Blanchard <mblanchard@macrosssoftware.com>
This commit is contained in:
Reiley Yang 2020-08-21 18:51:09 -07:00 committed by GitHub
parent f5bffdb7fa
commit 74232b824d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 155 additions and 171 deletions

View File

@ -2,10 +2,10 @@
* To export telemetry to a specific destination, custom exporters must be
written.
* Exporters should inherit from `ActivityExporter` and implement `ExportAsync`
and `ShutdownAsync` methods. `ActivityExporter` is part of the [OpenTelemetry
* Exporters should inherit from `ActivityExporter` and implement `Export` and
`Shutdown` methods. `ActivityExporter` is part of the [OpenTelemetry
Package](https://www.nuget.org/packages/opentelemetry).
* Depending on user's choice and load on the application, `ExportAsync` may get
* Depending on user's choice and load on the application, `Export` may get
called with zero or more activities.
* Exporters will only receive sampled-in and ended activities.
* Exporters must not throw.

View File

@ -39,15 +39,14 @@ internal class MyActivityProcessor : ActivityProcessor
Console.WriteLine($"{this}.OnEnd");
}
public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
Console.WriteLine($"{this}.ForceFlushAsync");
return Task.CompletedTask;
Console.WriteLine($"{this}.ForceFlush");
return true;
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
Console.WriteLine($"{this}.ShutdownAsync");
return Task.CompletedTask;
Console.WriteLine($"{this}.Shutdown");
}
}

View File

@ -6,13 +6,11 @@
critical code path.
```csharp
class MySampler : Sampler
internal class MySampler : Sampler
{
public override SamplingResult ShouldSample(in SamplingParameters samplingParameters)
{
var shouldSample = true;
return new SamplingResult(shouldSample);
return new SamplingResult(SamplingDecision.RecordAndSampled);
}
}
```

View File

@ -18,6 +18,7 @@ using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using OpenTelemetry.Exporter.Jaeger.Implementation;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
@ -61,7 +62,7 @@ namespace OpenTelemetry.Exporter.Jaeger
}
/// <inheritdoc/>
public override void Shutdown()
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
this.JaegerAgentUdpBatcher.FlushAsync(default).GetAwaiter().GetResult();
}

View File

@ -32,7 +32,8 @@
[#1094](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1094)
[#1113](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1113)
[#1127](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1127)
[#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129))
[#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129)
[#1135](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1135))
## 0.4.0-beta.2

View File

@ -16,6 +16,7 @@
using System;
using System.Diagnostics;
using System.Threading;
namespace OpenTelemetry.Trace
{
@ -48,9 +49,14 @@ namespace OpenTelemetry.Trace
public abstract ExportResult Export(in Batch<Activity> batch);
/// <summary>
/// Shuts down the exporter.
/// Attempts to shutdown the exporter, blocks the current thread until
/// shutdown completed or timed out.
/// </summary>
public virtual void Shutdown()
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
}

View File

@ -46,31 +46,31 @@ namespace OpenTelemetry.Trace
}
/// <summary>
/// Shuts down Activity processor asynchronously.
/// Flushes the <see cref="ActivityProcessor"/>, blocks the current
/// thread until flush completed, shutdown signaled or timed out.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public virtual Task ShutdownAsync(CancellationToken cancellationToken)
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public virtual bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}
/// <summary>
/// Flushes all activities that have not yet been processed.
/// Attempts to shutdown the processor, blocks the current thread until
/// shutdown completed or timed out.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public virtual Task ForceFlushAsync(CancellationToken cancellationToken)
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
/// <inheritdoc/>
@ -91,7 +91,7 @@ namespace OpenTelemetry.Trace
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
this.Shutdown();
}
catch (Exception ex)
{

View File

@ -143,18 +143,18 @@ namespace OpenTelemetry.Trace
/// the current thread until flush completed, shutdown signaled or
/// timed out.
/// </summary>
/// <param name="timeoutMillis">
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
/// <returns>
/// Returns <c>true</c> when flush completed; otherwise, <c>false</c>.
/// </returns>
public bool ForceFlush(int timeoutMillis = Timeout.Infinite)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}
var tail = this.circularBuffer.RemovedCount;
@ -167,7 +167,7 @@ namespace OpenTelemetry.Trace
this.exportTrigger.Set();
if (timeoutMillis == 0)
if (timeoutMilliseconds == 0)
{
return false;
}
@ -182,13 +182,13 @@ namespace OpenTelemetry.Trace
while (true)
{
if (timeoutMillis == Timeout.Infinite)
if (timeoutMilliseconds == Timeout.Infinite)
{
WaitHandle.WaitAny(triggers, pollingMillis);
}
else
{
var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds;
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;
if (timeout <= 0)
{
@ -210,46 +210,30 @@ namespace OpenTelemetry.Trace
}
}
/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ForceFlushAsync(CancellationToken cancellationToken)
{
// TODO
throw new NotImplementedException();
}
/// <summary>
/// Attempt to drain the queue and shutdown the exporter, blocks the
/// Attempts to drain the queue and shutdown the exporter, blocks the
/// current thread until shutdown completed or timed out.
/// </summary>
/// <param name="timeoutMillis">
/// <param name="timeoutMilliseconds">
/// The number of milliseconds to wait, or <c>Timeout.Infinite</c> to
/// wait indefinitely.
/// </param>
public void Shutdown(int timeoutMillis = Timeout.Infinite)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
throw new ArgumentOutOfRangeException(nameof(timeoutMillis));
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}
this.shutdownDrainTarget = this.circularBuffer.AddedCount;
this.shutdownTrigger.Set();
if (timeoutMillis != 0)
if (timeoutMilliseconds != 0)
{
this.exporterThread.Join(timeoutMillis);
this.exporterThread.Join(timeoutMilliseconds);
}
}
/// <inheritdoc/>
/// <exception cref="OperationCanceledException">If the <paramref name="cancellationToken"/> is canceled.</exception>
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
// TODO
throw new NotImplementedException();
}
/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>

View File

@ -69,6 +69,7 @@ namespace OpenTelemetry.Trace
return this;
}
/// <inheritdoc/>
public override void OnEnd(Activity activity)
{
var cur = this.head;
@ -80,6 +81,7 @@ namespace OpenTelemetry.Trace
}
}
/// <inheritdoc/>
public override void OnStart(Activity activity)
{
var cur = this.head;
@ -91,32 +93,75 @@ namespace OpenTelemetry.Trace
}
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
var cur = this.head;
var task = cur.Value.ShutdownAsync(cancellationToken);
for (cur = cur.Next; cur != null; cur = cur.Next)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
var processor = cur.Value;
task = task.ContinueWith(t => processor.ShutdownAsync(cancellationToken));
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}
return task;
var cur = this.head;
var sw = Stopwatch.StartNew();
while (cur != null)
{
if (timeoutMilliseconds == Timeout.Infinite)
{
var succeeded = cur.Value.ForceFlush(Timeout.Infinite);
}
else
{
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;
if (timeout <= 0)
{
return false;
}
var succeeded = cur.Value.ForceFlush((int)timeout);
if (!succeeded)
{
return false;
}
}
cur = cur.Next;
}
return true;
}
public override Task ForceFlushAsync(CancellationToken cancellationToken)
/// <inheritdoc/>
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
var cur = this.head;
var task = cur.Value.ForceFlushAsync(cancellationToken);
for (cur = cur.Next; cur != null; cur = cur.Next)
if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite)
{
var processor = cur.Value;
task = task.ContinueWith(t => processor.ForceFlushAsync(cancellationToken));
throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds));
}
return task;
var cur = this.head;
var sw = Stopwatch.StartNew();
while (cur != null)
{
if (timeoutMilliseconds == Timeout.Infinite)
{
cur.Value.Shutdown(Timeout.Infinite);
}
else
{
var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds;
// notify all the processors, even if we run overtime
cur.Value.Shutdown((int)Math.Max(timeout, 0));
}
cur = cur.Next;
}
}
protected override void Dispose(bool disposing)

View File

@ -53,19 +53,14 @@ namespace OpenTelemetry.Trace
}
/// <inheritdoc />
public override Task ShutdownAsync(CancellationToken cancellationToken)
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
if (!this.stopped)
{
// TODO: pass down the timeout to exporter
this.exporter.Shutdown();
this.stopped = true;
}
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
/// <summary>

View File

@ -53,24 +53,15 @@ namespace OpenTelemetry.Exporter.Jaeger.Tests
this.EndAction?.Invoke(activity);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.ShutdownCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
this.ForceFlushCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
this.ShutdownCalled = true;
}
protected override void Dispose(bool disposing)

View File

@ -53,24 +53,15 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests
this.EndAction?.Invoke(span);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.ShutdownCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
this.ForceFlushCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
this.ShutdownCalled = true;
}
protected override void Dispose(bool disposing)

View File

@ -54,24 +54,15 @@ namespace OpenTelemetry.Exporter.ZPages.Tests
this.EndAction?.Invoke(activity);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.ShutdownCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
this.ForceFlushCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
this.ShutdownCalled = true;
}
protected override void Dispose(bool disposing)

View File

@ -53,24 +53,15 @@ namespace OpenTelemetry.Exporter.Zipkin.Tests
this.EndAction?.Invoke(span);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.ShutdownCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
this.ForceFlushCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
this.ShutdownCalled = true;
}
protected override void Dispose(bool disposing)

View File

@ -53,24 +53,15 @@ namespace OpenTelemetry.Tests
this.EndAction?.Invoke(span);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.ShutdownCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
public override Task ForceFlushAsync(CancellationToken cancellationToken)
public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite)
{
this.ForceFlushCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
return true;
}
public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite)
{
this.ShutdownCalled = true;
}
protected override void Dispose(bool disposing)

View File

@ -77,7 +77,7 @@ namespace OpenTelemetry.Trace.Tests
using (var processor = new CompositeActivityProcessor(new[] { p1, p2 }))
{
processor.ShutdownAsync(default).Wait();
processor.Shutdown();
Assert.True(p1.ShutdownCalled);
Assert.True(p2.ShutdownCalled);
}
@ -91,7 +91,7 @@ namespace OpenTelemetry.Trace.Tests
using (var processor = new CompositeActivityProcessor(new[] { p1, p2 }))
{
processor.ForceFlushAsync(default).Wait();
processor.ForceFlush();
Assert.True(p1.ForceFlushCalled);
Assert.True(p2.ForceFlushCalled);
}