Updating ActivityProcessor (#975)

* Updating ActivityProcessor

updating tests

updating tests - 2

updating merge

updating tests - 3

updating tests - 4

using inrange instead of equal

updating tests

* updating to inRange

* following docs.microsoft

* updating tests and changelog

* Adding in changelog custom behavior

* Removing duplicated IDisposable

* forcing dispose

* updating tests

* moving to single instead of equal one

* returning to default

* setting more time to pass tests

* forcing dispose

* reiley's suggestions

* reverting order

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
This commit is contained in:
Eddy Nakamura 2020-08-04 18:40:30 -03:00 committed by GitHub
parent da8cd0d5c8
commit 5cdfd30284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 83 additions and 86 deletions

View File

@ -37,7 +37,7 @@ namespace OpenTelemetry.Exporter.Jaeger.Implementation
private readonly System.Timers.Timer maxFlushIntervalTimer;
private Dictionary<string, Process> processCache;
private int batchByteSize;
private bool isDisposed;
private bool disposed;
public JaegerUdpBatcher(JaegerExporterOptions options, TTransport clientTransport = null)
{
@ -222,7 +222,7 @@ namespace OpenTelemetry.Exporter.Jaeger.Implementation
}
}
protected virtual void Dispose(bool isDisposing)
protected virtual void Dispose(bool disposing)
{
try
{
@ -232,7 +232,7 @@ namespace OpenTelemetry.Exporter.Jaeger.Implementation
{
}
if (isDisposing && !this.isDisposed)
if (disposing && !this.disposed)
{
this.maxFlushIntervalTimer.Dispose();
this.thriftClient.Dispose();
@ -240,7 +240,7 @@ namespace OpenTelemetry.Exporter.Jaeger.Implementation
this.memoryProtocol.Dispose();
this.flushLock.Dispose();
this.isDisposed = true;
this.disposed = true;
}
}

View File

@ -29,7 +29,7 @@ namespace OpenTelemetry.Exporter.ZPages
/// <summary>
/// Implements the zpages span processor that exports spans in OnEnd call without batching.
/// </summary>
public class ZPagesProcessor : ActivityProcessor, IDisposable
public class ZPagesProcessor : ActivityProcessor
{
private readonly ZPagesExporter exporter;
@ -133,8 +133,8 @@ namespace OpenTelemetry.Exporter.ZPages
throw new NotImplementedException();
}
/// <inheritdoc />
public void Dispose()
/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
throw new NotImplementedException();
}

View File

@ -2,6 +2,14 @@
## Unreleased
* `ActivityProcessor` implements `IDisposable`.
* When `Dispose` occurs, it calls `ShutdownAsync`.
* If you want a custom behavior for dispose, you will have to override the
`Dispose(bool disposing)`.
* `BatchingActivityProcessor`/`SimpleActivityProcessor` is disposable and it
disposes the containing exporter.
* `BroadcastActivityProcessor`is disposable and it disposes the processors.
## 0.3.0-beta
Released 2020-07-23

View File

@ -13,16 +13,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Trace
{
/// <summary>
/// Activity processor base class.
/// </summary>
public abstract class ActivityProcessor
public abstract class ActivityProcessor : IDisposable
{
/// <summary>
/// Activity start hook.
@ -49,5 +52,24 @@ namespace OpenTelemetry.Trace
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>Returns <see cref="Task"/>.</returns>
public abstract Task ForceFlushAsync(CancellationToken cancellationToken);
/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
}
}
}

View File

@ -27,7 +27,7 @@ namespace OpenTelemetry.Trace
/// <summary>
/// Implements processor that batches activities before calling exporter.
/// </summary>
public class BatchingActivityProcessor : ActivityProcessor, IDisposable
public class BatchingActivityProcessor : ActivityProcessor
{
private const int DefaultMaxQueueSize = 2048;
private const int DefaultMaxExportBatchSize = 512;
@ -44,7 +44,7 @@ namespace OpenTelemetry.Trace
private readonly SemaphoreSlim flushLock = new SemaphoreSlim(1);
private readonly System.Timers.Timer flushTimer;
private volatile int currentQueueSize;
private bool isDisposed;
private bool disposed;
/// <summary>
/// Initializes a new instance of the <see cref="BatchingActivityProcessor"/> class with default parameters:
@ -202,28 +202,15 @@ namespace OpenTelemetry.Trace
OpenTelemetrySdkEventSource.Log.ForceFlushCompleted(this.currentQueueSize);
}
/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
}
/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
/// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
protected override void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
base.Dispose(disposing);
if (disposing && !this.isDisposed)
if (disposing && !this.disposed)
{
if (this.exporter is IDisposable disposableExporter)
{
@ -239,7 +226,7 @@ namespace OpenTelemetry.Trace
this.flushTimer.Dispose();
this.flushLock.Dispose();
this.isDisposed = true;
this.disposed = true;
}
}

View File

@ -24,10 +24,10 @@ using OpenTelemetry.Internal;
namespace OpenTelemetry.Trace.Internal
{
internal class BroadcastActivityProcessor : ActivityProcessor, IDisposable
internal class BroadcastActivityProcessor : ActivityProcessor
{
private readonly IEnumerable<ActivityProcessor> processors;
private bool isDisposed;
private bool disposed;
public BroadcastActivityProcessor(IEnumerable<ActivityProcessor> processors)
{
@ -96,23 +96,11 @@ namespace OpenTelemetry.Trace.Internal
return Task.WhenAll(tasks);
}
public void Dispose()
protected override void Dispose(bool disposing)
{
this.Dispose(true);
}
base.Dispose(disposing);
protected virtual void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
if (disposing && !this.isDisposed)
if (disposing && !this.disposed)
{
foreach (var processor in this.processors)
{
@ -129,7 +117,7 @@ namespace OpenTelemetry.Trace.Internal
}
}
this.isDisposed = true;
this.disposed = true;
}
}
}

View File

@ -24,7 +24,7 @@ namespace OpenTelemetry.Trace
/// <summary>
/// Implements simple activity processor that exports activities in OnEnd call without batching.
/// </summary>
public class SimpleActivityProcessor : ActivityProcessor, IDisposable
public class SimpleActivityProcessor : ActivityProcessor
{
private readonly ActivityExporter exporter;
private bool stopped;
@ -85,26 +85,13 @@ namespace OpenTelemetry.Trace
#endif
}
/// <inheritdoc/>
public void Dispose()
{
this.Dispose(true);
}
/// <summary>
/// Releases the unmanaged resources used by this class and optionally releases the managed resources.
/// </summary>
/// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langword="false"/> to release only unmanaged resources.</param>
protected virtual void Dispose(bool disposing)
protected override void Dispose(bool disposing)
{
try
{
this.ShutdownAsync(CancellationToken.None).GetAwaiter().GetResult();
}
catch (Exception ex)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException(nameof(this.Dispose), ex);
}
base.Dispose(disposing);
if (disposing)
{

View File

@ -73,7 +73,7 @@ namespace OpenTelemetry.Exporter.Jaeger.Tests
#endif
}
public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}

View File

@ -73,7 +73,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests
#endif
}
public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}

View File

@ -74,7 +74,7 @@ namespace OpenTelemetry.Exporter.ZPages.Tests
#endif
}
public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}

View File

@ -73,7 +73,7 @@ namespace OpenTelemetry.Exporter.Zipkin.Tests
#endif
}
public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}

View File

@ -194,7 +194,7 @@ namespace OpenTelemetry.Instrumentation.AspNet.Tests
var currentActivity = Activity.Current;
Activity span;
Assert.Equal(2, activityProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, activityProcessor.Invocations.Count); // begin/end/dispose was called
span = (Activity)activityProcessor.Invocations[1].Arguments[0];
Assert.Equal(routeTemplate ?? HttpContext.Current.Request.Path, span.DisplayName);

View File

@ -66,7 +66,7 @@ namespace OpenTelemetry.Instrumentation.GrpcClient.Tests
var rs = client.SayHello(new HelloRequest());
}
Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
var span = (Activity)spanProcessor.Invocations[1].Arguments[0];
Assert.Equal(parent.TraceId, span.Context.TraceId);
@ -117,7 +117,7 @@ namespace OpenTelemetry.Instrumentation.GrpcClient.Tests
var rs = client.SayHello(new HelloRequest());
}
Assert.Equal(4, spanProcessor.Invocations.Count); // begin and end was called for Grpc call and underlying Http call
Assert.Equal(5, spanProcessor.Invocations.Count); // begin and end was called for Grpc call and underlying Http call + dispose
var httpSpan = (Activity)spanProcessor.Invocations[2].Arguments[0];
var grpcSpan = (Activity)spanProcessor.Invocations[3].Arguments[0];

View File

@ -98,7 +98,7 @@ namespace OpenTelemetry.Instrumentation.Http.Tests
await c.SendAsync(request);
}
Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
var span = (Activity)spanProcessor.Invocations[1].Arguments[0];
Assert.Equal(parent.TraceId, span.Context.TraceId);
@ -151,7 +151,7 @@ namespace OpenTelemetry.Instrumentation.Http.Tests
await c.SendAsync(request);
}
Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
var span = (Activity)spanProcessor.Invocations[1].Arguments[0];
Assert.Equal(parent.TraceId, span.Context.TraceId);
@ -225,7 +225,7 @@ namespace OpenTelemetry.Instrumentation.Http.Tests
await c.SendAsync(request);
}
Assert.Equal(0, spanProcessor.Invocations.Count);
Assert.Equal(1, spanProcessor.Invocations.Count); // dispose
}
[Fact]
@ -243,7 +243,7 @@ namespace OpenTelemetry.Instrumentation.Http.Tests
await c.GetAsync(this.url);
}
Assert.Equal(0, spanProcessor.Invocations.Count);
Assert.Equal(1, spanProcessor.Invocations.Count); // dispose
}
[Fact]
@ -268,7 +268,7 @@ namespace OpenTelemetry.Instrumentation.Http.Tests
}
}
Assert.Equal(0, spanProcessor.Invocations.Count);
Assert.Equal(1, spanProcessor.Invocations.Count); // dispose
}
public void Dispose()

View File

@ -85,7 +85,7 @@ namespace OpenTelemetry.Instrumentation.Http.Tests
}
}
Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
var span = (Activity)spanProcessor.Invocations[1].Arguments[0];
Assert.Equal(tc.SpanName, span.DisplayName);

View File

@ -173,7 +173,7 @@ namespace OpenTelemetry.Instrumentation.SqlClient.Tests
afterExecuteEventData);
}
Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // start/end/dispose was called
VerifyActivityData(sqlCommand.CommandType, sqlCommand.CommandText, captureStoredProcedureCommandName, captureTextCommandContent, false, sqlConnection.DataSource, (Activity)spanProcessor.Invocations[1].Arguments[0]);
}
@ -219,7 +219,7 @@ namespace OpenTelemetry.Instrumentation.SqlClient.Tests
commandErrorEventData);
}
Assert.Equal(2, spanProcessor.Invocations.Count); // begin and end was called
Assert.Equal(3, spanProcessor.Invocations.Count); // begin and end was called
VerifyActivityData(sqlCommand.CommandType, sqlCommand.CommandText, true, false, true, sqlConnection.DataSource, (Activity)spanProcessor.Invocations[1].Arguments[0]);
}

View File

@ -82,7 +82,7 @@ namespace OpenTelemetry.Instrumentation.SqlClient.Tests
{
}
Assert.Equal(2, activityProcessor.Invocations.Count);
Assert.Equal(3, activityProcessor.Invocations.Count);
var activity = (Activity)activityProcessor.Invocations[1].Arguments[0];
@ -131,8 +131,8 @@ namespace OpenTelemetry.Instrumentation.SqlClient.Tests
int compositeState = successFlag | isSqlExceptionFlag | synchronousFlag;
fakeSqlEventSource.WriteEndExecuteEvent(objectId, compositeState, sqlExceptionNumber);
Assert.Equal(2, activityProcessor.Invocations.Count);
shutdownSignal.Dispose();
Assert.Equal(3, activityProcessor.Invocations.Count);
var activity = (Activity)activityProcessor.Invocations[1].Arguments[0];
@ -153,7 +153,9 @@ namespace OpenTelemetry.Instrumentation.SqlClient.Tests
fakeSqlEventSource.WriteUnknownEventWithNullPayload();
Assert.Equal(0, activityProcessor.Invocations.Count);
shutdownSignal.Dispose();
Assert.Single(activityProcessor.Invocations);
}
[Fact]
@ -171,8 +173,9 @@ namespace OpenTelemetry.Instrumentation.SqlClient.Tests
fakeSqlEventSource.WriteBeginExecuteEvent("arg1");
fakeSqlEventSource.WriteEndExecuteEvent("arg1", "arg2", "arg3", "arg4");
shutdownSignal.Dispose();
Assert.Equal(0, activityProcessor.Invocations.Count);
Assert.Single(activityProcessor.Invocations);
}
private static void VerifyActivityData(

View File

@ -75,7 +75,7 @@ namespace OpenTelemetry.Instrumentation.StackExchangeRedis.Tests
// Disposing SDK should flush the Redis profiling session immediately.
Assert.Equal(4, activityProcessor.Invocations.Count);
Assert.Equal(5, activityProcessor.Invocations.Count);
VerifyActivityData((Activity)activityProcessor.Invocations[1].Arguments[0], true, connection.GetEndPoints()[0]);
VerifyActivityData((Activity)activityProcessor.Invocations[3].Arguments[0], false, connection.GetEndPoints()[0]);

View File

@ -22,7 +22,7 @@ using OpenTelemetry.Trace;
namespace OpenTelemetry.Tests.Implementation.Testing.Export
{
public class TestActivityProcessor : ActivityProcessor, IDisposable
public class TestActivityProcessor : ActivityProcessor
{
public Action<Activity> StartAction;
public Action<Activity> EndAction;
@ -73,7 +73,7 @@ namespace OpenTelemetry.Tests.Implementation.Testing.Export
#endif
}
public void Dispose()
protected override void Dispose(bool disposing)
{
this.DisposedCalled = true;
}

View File

@ -34,7 +34,7 @@ namespace OpenTelemetry.Trace.Test
private const string ActivitySourceName = "my.source";
private static readonly TimeSpan DefaultDelay = TimeSpan.FromMilliseconds(30);
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(1);
private static readonly TimeSpan DefaultTimeout = TimeSpan.FromSeconds(2);
private static readonly ActivitySource Source = new ActivitySource(ActivitySourceName);
[Fact]
@ -179,7 +179,7 @@ namespace OpenTelemetry.Trace.Test
var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout);
Assert.Equal(1, exportCalledCount);
Assert.InRange(exported.Length, 1, 2);
Assert.Single(exported);
Assert.Contains(activities.First(), exported);
}
@ -273,6 +273,8 @@ namespace OpenTelemetry.Trace.Test
resetEvent.Set();
openTelemetrySdk.Dispose();
var exported = this.WaitForActivities(activityExporter, 1, DefaultTimeout);
Assert.Single(exported);