From 74232b824de7eb7f92abf86fdfeda250c961f0e8 Mon Sep 17 00:00:00 2001 From: Reiley Yang Date: Fri, 21 Aug 2020 18:51:09 -0700 Subject: [PATCH] Refactor exporter - step 10 (#1135) * make Shutdown/Flush sync * calculate remaining time * fix doc * update changelog * s/timeoutMillis/timeoutMilliseconds/g Co-authored-by: Mikel Blanchard --- .../building-your-own-exporter/README.md | 6 +- .../MyActivityProcessor.cs | 11 ++- .../trace/building-your-own-sampler/README.md | 6 +- .../JaegerExporter.cs | 3 +- src/OpenTelemetry/CHANGELOG.md | 3 +- src/OpenTelemetry/Trace/ActivityExporter.cs | 10 ++- src/OpenTelemetry/Trace/ActivityProcessor.cs | 38 ++++----- .../Trace/BatchExportActivityProcessor.cs | 44 ++++------- .../Trace/CompositeActivityProcessor.cs | 77 +++++++++++++++---- .../Trace/ReentrantExportActivityProcessor.cs | 9 +-- .../TestActivityProcessor.cs | 23 ++---- .../TestActivityProcessor.cs | 23 ++---- .../TestActivityProcessor.cs | 23 ++---- .../TestActivityProcessor.cs | 23 ++---- .../Shared/TestActivityProcessor.cs | 23 ++---- .../Trace/CompositeActivityProcessorTests.cs | 4 +- 16 files changed, 155 insertions(+), 171 deletions(-) diff --git a/docs/trace/building-your-own-exporter/README.md b/docs/trace/building-your-own-exporter/README.md index 499a79ec4..0d171695a 100644 --- a/docs/trace/building-your-own-exporter/README.md +++ b/docs/trace/building-your-own-exporter/README.md @@ -2,10 +2,10 @@ * To export telemetry to a specific destination, custom exporters must be written. -* Exporters should inherit from `ActivityExporter` and implement `ExportAsync` - and `ShutdownAsync` methods. `ActivityExporter` is part of the [OpenTelemetry +* Exporters should inherit from `ActivityExporter` and implement `Export` and + `Shutdown` methods. `ActivityExporter` is part of the [OpenTelemetry Package](https://www.nuget.org/packages/opentelemetry). -* Depending on user's choice and load on the application, `ExportAsync` may get +* Depending on user's choice and load on the application, `Export` may get called with zero or more activities. * Exporters will only receive sampled-in and ended activities. * Exporters must not throw. diff --git a/docs/trace/building-your-own-processor/MyActivityProcessor.cs b/docs/trace/building-your-own-processor/MyActivityProcessor.cs index 92a71ecd8..5707b7ee7 100644 --- a/docs/trace/building-your-own-processor/MyActivityProcessor.cs +++ b/docs/trace/building-your-own-processor/MyActivityProcessor.cs @@ -39,15 +39,14 @@ internal class MyActivityProcessor : ActivityProcessor Console.WriteLine($"{this}.OnEnd"); } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - Console.WriteLine($"{this}.ForceFlushAsync"); - return Task.CompletedTask; + Console.WriteLine($"{this}.ForceFlush"); + return true; } - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - Console.WriteLine($"{this}.ShutdownAsync"); - return Task.CompletedTask; + Console.WriteLine($"{this}.Shutdown"); } } diff --git a/docs/trace/building-your-own-sampler/README.md b/docs/trace/building-your-own-sampler/README.md index 2271ad58e..1b21ecc19 100644 --- a/docs/trace/building-your-own-sampler/README.md +++ b/docs/trace/building-your-own-sampler/README.md @@ -6,13 +6,11 @@ critical code path. ```csharp -class MySampler : Sampler +internal class MySampler : Sampler { public override SamplingResult ShouldSample(in SamplingParameters samplingParameters) { - var shouldSample = true; - - return new SamplingResult(shouldSample); + return new SamplingResult(SamplingDecision.RecordAndSampled); } } ``` diff --git a/src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs b/src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs index ab2ee1452..8ca39414d 100644 --- a/src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs +++ b/src/OpenTelemetry.Exporter.Jaeger/JaegerExporter.cs @@ -18,6 +18,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Threading; using OpenTelemetry.Exporter.Jaeger.Implementation; using OpenTelemetry.Resources; using OpenTelemetry.Trace; @@ -61,7 +62,7 @@ namespace OpenTelemetry.Exporter.Jaeger } /// - public override void Shutdown() + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { this.JaegerAgentUdpBatcher.FlushAsync(default).GetAwaiter().GetResult(); } diff --git a/src/OpenTelemetry/CHANGELOG.md b/src/OpenTelemetry/CHANGELOG.md index 3bb44b82e..7394811bf 100644 --- a/src/OpenTelemetry/CHANGELOG.md +++ b/src/OpenTelemetry/CHANGELOG.md @@ -32,7 +32,8 @@ [#1094](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1094) [#1113](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1113) [#1127](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1127) - [#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129)) + [#1129](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1129) + [#1135](https://github.com/open-telemetry/opentelemetry-dotnet/pull/1135)) ## 0.4.0-beta.2 diff --git a/src/OpenTelemetry/Trace/ActivityExporter.cs b/src/OpenTelemetry/Trace/ActivityExporter.cs index 4c0aa6889..357c199cb 100644 --- a/src/OpenTelemetry/Trace/ActivityExporter.cs +++ b/src/OpenTelemetry/Trace/ActivityExporter.cs @@ -16,6 +16,7 @@ using System; using System.Diagnostics; +using System.Threading; namespace OpenTelemetry.Trace { @@ -48,9 +49,14 @@ namespace OpenTelemetry.Trace public abstract ExportResult Export(in Batch batch); /// - /// Shuts down the exporter. + /// Attempts to shutdown the exporter, blocks the current thread until + /// shutdown completed or timed out. /// - public virtual void Shutdown() + /// + /// The number of milliseconds to wait, or Timeout.Infinite to + /// wait indefinitely. + /// + public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { } diff --git a/src/OpenTelemetry/Trace/ActivityProcessor.cs b/src/OpenTelemetry/Trace/ActivityProcessor.cs index 45e6950ac..7acf08c0b 100644 --- a/src/OpenTelemetry/Trace/ActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/ActivityProcessor.cs @@ -46,31 +46,31 @@ namespace OpenTelemetry.Trace } /// - /// Shuts down Activity processor asynchronously. + /// Flushes the , blocks the current + /// thread until flush completed, shutdown signaled or timed out. /// - /// Cancellation token. - /// Returns . - public virtual Task ShutdownAsync(CancellationToken cancellationToken) + /// + /// The number of milliseconds to wait, or Timeout.Infinite to + /// wait indefinitely. + /// + /// + /// Returns true when flush completed; otherwise, false. + /// + public virtual bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + return true; } /// - /// Flushes all activities that have not yet been processed. + /// Attempts to shutdown the processor, blocks the current thread until + /// shutdown completed or timed out. /// - /// Cancellation token. - /// Returns . - public virtual Task ForceFlushAsync(CancellationToken cancellationToken) + /// + /// The number of milliseconds to wait, or Timeout.Infinite to + /// wait indefinitely. + /// + public virtual void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif } /// @@ -91,7 +91,7 @@ namespace OpenTelemetry.Trace { try { - this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult(); + this.Shutdown(); } catch (Exception ex) { diff --git a/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs b/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs index 93dd0c602..9aab34dfd 100644 --- a/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/BatchExportActivityProcessor.cs @@ -143,18 +143,18 @@ namespace OpenTelemetry.Trace /// the current thread until flush completed, shutdown signaled or /// timed out. /// - /// + /// /// The number of milliseconds to wait, or Timeout.Infinite to /// wait indefinitely. /// /// /// Returns true when flush completed; otherwise, false. /// - public bool ForceFlush(int timeoutMillis = Timeout.Infinite) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMillis)); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds)); } var tail = this.circularBuffer.RemovedCount; @@ -167,7 +167,7 @@ namespace OpenTelemetry.Trace this.exportTrigger.Set(); - if (timeoutMillis == 0) + if (timeoutMilliseconds == 0) { return false; } @@ -182,13 +182,13 @@ namespace OpenTelemetry.Trace while (true) { - if (timeoutMillis == Timeout.Infinite) + if (timeoutMilliseconds == Timeout.Infinite) { WaitHandle.WaitAny(triggers, pollingMillis); } else { - var timeout = (long)timeoutMillis - sw.ElapsedMilliseconds; + var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds; if (timeout <= 0) { @@ -210,46 +210,30 @@ namespace OpenTelemetry.Trace } } - /// - /// If the is canceled. - public override Task ForceFlushAsync(CancellationToken cancellationToken) - { - // TODO - throw new NotImplementedException(); - } - /// - /// Attempt to drain the queue and shutdown the exporter, blocks the + /// Attempts to drain the queue and shutdown the exporter, blocks the /// current thread until shutdown completed or timed out. /// - /// + /// /// The number of milliseconds to wait, or Timeout.Infinite to /// wait indefinitely. /// - public void Shutdown(int timeoutMillis = Timeout.Infinite) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - if (timeoutMillis < 0 && timeoutMillis != Timeout.Infinite) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - throw new ArgumentOutOfRangeException(nameof(timeoutMillis)); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds)); } this.shutdownDrainTarget = this.circularBuffer.AddedCount; this.shutdownTrigger.Set(); - if (timeoutMillis != 0) + if (timeoutMilliseconds != 0) { - this.exporterThread.Join(timeoutMillis); + this.exporterThread.Join(timeoutMilliseconds); } } - /// - /// If the is canceled. - public override Task ShutdownAsync(CancellationToken cancellationToken) - { - // TODO - throw new NotImplementedException(); - } - /// /// Releases the unmanaged resources used by this class and optionally releases the managed resources. /// diff --git a/src/OpenTelemetry/Trace/CompositeActivityProcessor.cs b/src/OpenTelemetry/Trace/CompositeActivityProcessor.cs index 975b179ef..37d8178b2 100644 --- a/src/OpenTelemetry/Trace/CompositeActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/CompositeActivityProcessor.cs @@ -69,6 +69,7 @@ namespace OpenTelemetry.Trace return this; } + /// public override void OnEnd(Activity activity) { var cur = this.head; @@ -80,6 +81,7 @@ namespace OpenTelemetry.Trace } } + /// public override void OnStart(Activity activity) { var cur = this.head; @@ -91,32 +93,75 @@ namespace OpenTelemetry.Trace } } - public override Task ShutdownAsync(CancellationToken cancellationToken) + /// + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { - var cur = this.head; - var task = cur.Value.ShutdownAsync(cancellationToken); - - for (cur = cur.Next; cur != null; cur = cur.Next) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - var processor = cur.Value; - task = task.ContinueWith(t => processor.ShutdownAsync(cancellationToken)); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds)); } - return task; + var cur = this.head; + + var sw = Stopwatch.StartNew(); + + while (cur != null) + { + if (timeoutMilliseconds == Timeout.Infinite) + { + var succeeded = cur.Value.ForceFlush(Timeout.Infinite); + } + else + { + var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds; + + if (timeout <= 0) + { + return false; + } + + var succeeded = cur.Value.ForceFlush((int)timeout); + + if (!succeeded) + { + return false; + } + } + + cur = cur.Next; + } + + return true; } - public override Task ForceFlushAsync(CancellationToken cancellationToken) + /// + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { - var cur = this.head; - var task = cur.Value.ForceFlushAsync(cancellationToken); - - for (cur = cur.Next; cur != null; cur = cur.Next) + if (timeoutMilliseconds < 0 && timeoutMilliseconds != Timeout.Infinite) { - var processor = cur.Value; - task = task.ContinueWith(t => processor.ForceFlushAsync(cancellationToken)); + throw new ArgumentOutOfRangeException(nameof(timeoutMilliseconds)); } - return task; + var cur = this.head; + + var sw = Stopwatch.StartNew(); + + while (cur != null) + { + if (timeoutMilliseconds == Timeout.Infinite) + { + cur.Value.Shutdown(Timeout.Infinite); + } + else + { + var timeout = (long)timeoutMilliseconds - sw.ElapsedMilliseconds; + + // notify all the processors, even if we run overtime + cur.Value.Shutdown((int)Math.Max(timeout, 0)); + } + + cur = cur.Next; + } } protected override void Dispose(bool disposing) diff --git a/src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs b/src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs index c7847b138..759260809 100644 --- a/src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs +++ b/src/OpenTelemetry/Trace/ReentrantExportActivityProcessor.cs @@ -53,19 +53,14 @@ namespace OpenTelemetry.Trace } /// - public override Task ShutdownAsync(CancellationToken cancellationToken) + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) { if (!this.stopped) { + // TODO: pass down the timeout to exporter this.exporter.Shutdown(); this.stopped = true; } - -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif } /// diff --git a/test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs b/test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs index a4dfb3715..175997c21 100644 --- a/test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Exporter.Jaeger.Tests/TestActivityProcessor.cs @@ -53,24 +53,15 @@ namespace OpenTelemetry.Exporter.Jaeger.Tests this.EndAction?.Invoke(activity); } - public override Task ShutdownAsync(CancellationToken cancellationToken) - { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif - } - - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + return true; + } + + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) + { + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestActivityProcessor.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestActivityProcessor.cs index b90ab90be..c3daaaeee 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/TestActivityProcessor.cs @@ -53,24 +53,15 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests this.EndAction?.Invoke(span); } - public override Task ShutdownAsync(CancellationToken cancellationToken) - { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif - } - - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + return true; + } + + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) + { + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Exporter.ZPages.Tests/TestActivityProcessor.cs b/test/OpenTelemetry.Exporter.ZPages.Tests/TestActivityProcessor.cs index 94bab7960..7c01cc3a7 100644 --- a/test/OpenTelemetry.Exporter.ZPages.Tests/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Exporter.ZPages.Tests/TestActivityProcessor.cs @@ -54,24 +54,15 @@ namespace OpenTelemetry.Exporter.ZPages.Tests this.EndAction?.Invoke(activity); } - public override Task ShutdownAsync(CancellationToken cancellationToken) - { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif - } - - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + return true; + } + + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) + { + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Exporter.Zipkin.Tests/TestActivityProcessor.cs b/test/OpenTelemetry.Exporter.Zipkin.Tests/TestActivityProcessor.cs index 414a9d6ae..0d60e4a69 100644 --- a/test/OpenTelemetry.Exporter.Zipkin.Tests/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Exporter.Zipkin.Tests/TestActivityProcessor.cs @@ -53,24 +53,15 @@ namespace OpenTelemetry.Exporter.Zipkin.Tests this.EndAction?.Invoke(span); } - public override Task ShutdownAsync(CancellationToken cancellationToken) - { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif - } - - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + return true; + } + + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) + { + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Tests/Shared/TestActivityProcessor.cs b/test/OpenTelemetry.Tests/Shared/TestActivityProcessor.cs index 4dd515852..134666eab 100644 --- a/test/OpenTelemetry.Tests/Shared/TestActivityProcessor.cs +++ b/test/OpenTelemetry.Tests/Shared/TestActivityProcessor.cs @@ -53,24 +53,15 @@ namespace OpenTelemetry.Tests this.EndAction?.Invoke(span); } - public override Task ShutdownAsync(CancellationToken cancellationToken) - { - this.ShutdownCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif - } - - public override Task ForceFlushAsync(CancellationToken cancellationToken) + public override bool ForceFlush(int timeoutMilliseconds = Timeout.Infinite) { this.ForceFlushCalled = true; -#if NET452 - return Task.FromResult(0); -#else - return Task.CompletedTask; -#endif + return true; + } + + public override void Shutdown(int timeoutMilliseconds = Timeout.Infinite) + { + this.ShutdownCalled = true; } protected override void Dispose(bool disposing) diff --git a/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs b/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs index b7b81a527..1a3f59d4c 100644 --- a/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs +++ b/test/OpenTelemetry.Tests/Trace/CompositeActivityProcessorTests.cs @@ -77,7 +77,7 @@ namespace OpenTelemetry.Trace.Tests using (var processor = new CompositeActivityProcessor(new[] { p1, p2 })) { - processor.ShutdownAsync(default).Wait(); + processor.Shutdown(); Assert.True(p1.ShutdownCalled); Assert.True(p2.ShutdownCalled); } @@ -91,7 +91,7 @@ namespace OpenTelemetry.Trace.Tests using (var processor = new CompositeActivityProcessor(new[] { p1, p2 })) { - processor.ForceFlushAsync(default).Wait(); + processor.ForceFlush(); Assert.True(p1.ForceFlushCalled); Assert.True(p2.ForceFlushCalled); }