Add support for multiple pipelines in OpenTelemetryBuilder with Activity (#735)

* Add support for multiple pipelines in OpenTelemetryBuilder with Activity

* change Setpipeline toAddpipeline

* Dispose activityprocessor
This commit is contained in:
Cijo Thomas 2020-06-17 17:41:19 -07:00 committed by GitHub
parent 10f870ebe5
commit afd9135026
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 857 additions and 29 deletions

View File

@ -42,7 +42,7 @@ namespace OpenTelemetry.Exporter.Console
var exporterOptions = new ConsoleActivityExporterOptions();
configure(exporterOptions);
var consoleExporter = new ConsoleActivityExporter(exporterOptions);
return builder.SetProcessorPipeline(pipeline => pipeline.SetExporter(consoleExporter));
return builder.AddProcessorPipeline(pipeline => pipeline.SetExporter(consoleExporter));
}
}
}

View File

@ -101,7 +101,7 @@ namespace OpenTelemetry.Trace.Configuration
throw new ArgumentNullException(nameof(configure));
}
return builder.SetProcessorPipeline(pipeline =>
return builder.AddProcessorPipeline(pipeline =>
{
var exporterOptions = new JaegerExporterOptions();
configure(exporterOptions);

View File

@ -1,4 +1,4 @@
// <copyright file="TracerBuilderExtensions.cs" company="OpenTelemetry Authors">
// <copyright file="TracerBuilderExtensions.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -102,7 +102,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol
throw new ArgumentNullException(nameof(configure));
}
return builder.SetProcessorPipeline(pipeline =>
return builder.AddProcessorPipeline(pipeline =>
{
var exporterOptions = new ExporterOptions();
configure(exporterOptions);

View File

@ -94,6 +94,7 @@ namespace OpenTelemetry.Trace.Configuration
}
else if (this.Exporter != null)
{
// TODO: Make this BatchingActivityProcessor once its available.
exportingProcessor = new SimpleActivityProcessor(this.Exporter);
this.Processors.Add(exportingProcessor);
}

View File

@ -29,7 +29,7 @@ namespace OpenTelemetry.Trace.Configuration
{
}
internal ActivityProcessorPipelineBuilder ProcessingPipeline { get; private set; }
internal List<ActivityProcessorPipelineBuilder> ProcessingPipelines { get; private set; }
internal List<InstrumentationFactory> InstrumentationFactories { get; private set; }
@ -42,16 +42,21 @@ namespace OpenTelemetry.Trace.Configuration
/// </summary>
/// <param name="configure">Function that configures pipeline.</param>
/// <returns>Returns <see cref="OpenTelemetryBuilder"/> for chaining.</returns>
public OpenTelemetryBuilder SetProcessorPipeline(Action<ActivityProcessorPipelineBuilder> configure)
public OpenTelemetryBuilder AddProcessorPipeline(Action<ActivityProcessorPipelineBuilder> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
if (this.ProcessingPipelines == null)
{
this.ProcessingPipelines = new List<ActivityProcessorPipelineBuilder>();
}
var pipelineBuilder = new ActivityProcessorPipelineBuilder();
configure(pipelineBuilder);
this.ProcessingPipeline = pipelineBuilder;
this.ProcessingPipelines.Add(pipelineBuilder);
return this;
}

View File

@ -17,7 +17,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using OpenTelemetry.Trace.Export;
using OpenTelemetry.Trace.Export.Internal;
using OpenTelemetry.Trace.Samplers;
namespace OpenTelemetry.Trace.Configuration
@ -25,6 +27,7 @@ namespace OpenTelemetry.Trace.Configuration
public class OpenTelemetrySdk : IDisposable
{
private readonly List<object> instrumentations = new List<object>();
private ActivityProcessor activityProcessor;
private ActivityListener listener;
static OpenTelemetrySdk()
@ -54,14 +57,29 @@ namespace OpenTelemetry.Trace.Configuration
ActivitySampler sampler = openTelemetryBuilder.Sampler ?? new AlwaysOnActivitySampler();
ActivityProcessor activityProcessor;
if (openTelemetryBuilder.ProcessingPipeline == null)
if (openTelemetryBuilder.ProcessingPipelines == null || !openTelemetryBuilder.ProcessingPipelines.Any())
{
// if there are no pipelines are configured, use noop processor
activityProcessor = new NoopActivityProcessor();
}
else if (openTelemetryBuilder.ProcessingPipelines.Count == 1)
{
// if there is only one pipeline - use it's outer processor as a
// single processor on the tracerSdk.
var processorFactory = openTelemetryBuilder.ProcessingPipelines[0];
activityProcessor = processorFactory.Build();
}
else
{
activityProcessor = openTelemetryBuilder.ProcessingPipeline.Build();
// if there are more pipelines, use processor that will broadcast to all pipelines
var processors = new ActivityProcessor[openTelemetryBuilder.ProcessingPipelines.Count];
for (int i = 0; i < openTelemetryBuilder.ProcessingPipelines.Count; i++)
{
processors[i] = openTelemetryBuilder.ProcessingPipelines[i].Build();
}
activityProcessor = new BroadcastActivityProcessor(processors);
}
var activitySource = new ActivitySourceAdapter(sampler, activityProcessor);
@ -96,6 +114,7 @@ namespace OpenTelemetry.Trace.Configuration
};
ActivitySource.AddActivityListener(openTelemetrySDK.listener);
openTelemetrySDK.activityProcessor = activityProcessor;
return openTelemetrySDK;
}
@ -112,6 +131,11 @@ namespace OpenTelemetry.Trace.Configuration
}
this.instrumentations.Clear();
if (this.activityProcessor is IDisposable disposableProcessor)
{
disposableProcessor.Dispose();
}
}
internal static ActivityDataRequest ComputeActivityDataRequest(

View File

@ -0,0 +1,105 @@
// <copyright file="BroadcastActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Trace.Export.Internal
{
internal class BroadcastActivityProcessor : ActivityProcessor, IDisposable
{
private readonly IEnumerable<ActivityProcessor> processors;
public BroadcastActivityProcessor(IEnumerable<ActivityProcessor> processors)
{
if (processors == null)
{
throw new ArgumentNullException(nameof(processors));
}
if (!processors.Any())
{
throw new ArgumentException($"{nameof(processors)} collection is empty");
}
this.processors = processors;
}
public override void OnEnd(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnEnd(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnEnd", e);
}
}
}
public override void OnStart(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnStart(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnStart", e);
}
}
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var processor in this.processors)
{
tasks.Add(processor.ShutdownAsync(cancellationToken));
}
return Task.WhenAll(tasks);
}
public void Dispose()
{
foreach (var processor in this.processors)
{
try
{
if (processor is IDisposable disposable)
{
disposable.Dispose();
}
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e);
}
}
}
}
}

View File

@ -162,7 +162,7 @@ namespace OpenTelemetry.Instrumentation.AspNet.Tests
options.TextFormat = textFormat.Object;
}
})
.SetProcessorPipeline(p => p.AddProcessor(_ => activityProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(_ => activityProcessor.Object))))
{
activity.Start();
this.fakeAspNetDiagnosticSource.Write(

View File

@ -61,7 +61,7 @@ namespace OpenTelemetry.Instrumentation.AspNetCore.Tests
void ConfigureTestServices(IServiceCollection services)
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}
// Arrange
@ -100,7 +100,7 @@ namespace OpenTelemetry.Instrumentation.AspNetCore.Tests
builder.ConfigureTestServices(services =>
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
})))
{
using var client = testFactory.CreateClient();
@ -148,7 +148,7 @@ namespace OpenTelemetry.Instrumentation.AspNetCore.Tests
{
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddRequestInstrumentation((opt) => opt.TextFormat = textFormat.Object)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
})))
{
using var client = testFactory.CreateClient();
@ -181,7 +181,7 @@ namespace OpenTelemetry.Instrumentation.AspNetCore.Tests
this.openTelemetrySdk = OpenTelemetrySdk.EnableOpenTelemetry(
(builder) =>
builder.AddRequestInstrumentation((opt) => opt.RequestFilter = (ctx) => ctx.Request.Path != "/api/values/2")
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}
// Arrange

View File

@ -53,7 +53,7 @@ namespace OpenTelemetry.Instrumentation.AspNetCore.Tests
{
services.AddSingleton<CallbackMiddleware.CallbackMiddlewareImpl>(new TestCallbackMiddlewareImpl());
services.AddOpenTelemetrySdk((builder) => builder.AddRequestInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object)));
}))
.CreateClient())
{

View File

@ -74,7 +74,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
@ -124,7 +124,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation((opt) => opt.TextFormat = textFormat.Object)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
@ -154,7 +154,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
@ -172,7 +172,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
@ -198,7 +198,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.SendAsync(request);
@ -218,7 +218,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
(opt) => opt.EventFilter = (activityName, arg1, _) => !(activityName == "System.Net.Http.HttpRequestOut" &&
arg1 is HttpRequestMessage request &&
request.RequestUri.OriginalString.Contains(this.url)))
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
await c.GetAsync(this.url);
@ -234,7 +234,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
using var c = new HttpClient();
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(100));

View File

@ -57,7 +57,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddHttpClientDependencyInstrumentation((opt) => opt.SetHttpFlavor = tc.SetHttpFlavor)
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
try
{

View File

@ -65,7 +65,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
var activityProcessor = new Mock<ActivityProcessor>();
using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b =>
{
b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddHttpWebRequestDependencyInstrumentation();
});
@ -113,7 +113,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
var activityProcessor = new Mock<ActivityProcessor>();
using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b =>
{
b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddHttpWebRequestDependencyInstrumentation();
});
@ -153,7 +153,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
var activityProcessor = new Mock<ActivityProcessor>();
using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b =>
{
b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddHttpWebRequestDependencyInstrumentation();
});

View File

@ -50,7 +50,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
var activityProcessor = new Mock<ActivityProcessor>();
using var shutdownSignal = OpenTelemetrySdk.EnableOpenTelemetry(b =>
{
b.SetProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddProcessorPipeline(c => c.AddProcessor(ap => activityProcessor.Object));
b.AddHttpWebRequestDependencyInstrumentation();
});

View File

@ -71,7 +71,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
opt.CaptureTextCommandContent = captureTextCommandContent;
opt.CaptureStoredProcedureCommandName = captureStoredProcedureCommandName;
})
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
var operationId = Guid.NewGuid();
var sqlConnection = new SqlConnection(TestConnectionString);
@ -160,7 +160,7 @@ namespace OpenTelemetry.Instrumentation.Dependencies.Tests
using (OpenTelemetrySdk.EnableOpenTelemetry(
(builder) => builder.AddSqlClientDependencyInstrumentation()
.SetProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
.AddProcessorPipeline(p => p.AddProcessor(n => spanProcessor.Object))))
{
var operationId = Guid.NewGuid();
var sqlConnection = new SqlConnection(TestConnectionString);

View File

@ -0,0 +1,62 @@
// <copyright file="TestActivityExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Trace.Export;
namespace OpenTelemetry.Testing.Export
{
public class TestActivityExporter : ActivityExporter
{
private readonly ConcurrentQueue<Activity> spanDataList = new ConcurrentQueue<Activity>();
private readonly Action<IEnumerable<Activity>> onExport;
public TestActivityExporter(Action<IEnumerable<Activity>> onExport)
{
this.onExport = onExport;
}
public Activity[] ExportedSpans => this.spanDataList.ToArray();
public bool WasShutDown { get; private set; } = false;
public override Task<ExportResult> ExportAsync(IEnumerable<Activity> data, CancellationToken cancellationToken)
{
this.onExport?.Invoke(data);
foreach (var s in data)
{
this.spanDataList.Enqueue(s);
}
return Task.FromResult(ExportResult.Success);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.WasShutDown = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
}
}

View File

@ -0,0 +1,259 @@
// <copyright file="ActivityProcessorPipelineTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Testing.Export;
using OpenTelemetry.Trace.Configuration;
using OpenTelemetry.Trace.Export;
using Xunit;
namespace OpenTelemetry.Tests.Impl.Trace.Config
{
public class ActivityProcessorPipelineTests
{
[Fact]
public void PipelineBuilder_BadArgs()
{
Assert.Throws<ArgumentNullException>(() => new ActivityProcessorPipelineBuilder().AddProcessor(null));
Assert.Throws<ArgumentNullException>(() => new ActivityProcessorPipelineBuilder().SetExporter(null));
Assert.Throws<ArgumentNullException>(() => new ActivityProcessorPipelineBuilder().SetExportingProcessor(null));
}
[Fact]
public void PipelineBuilder_Defaults()
{
var builder = new ActivityProcessorPipelineBuilder();
Assert.Null(builder.Exporter);
Assert.Null(builder.Processors);
var processor = builder.Build();
Assert.Null(builder.Exporter);
Assert.Single(builder.Processors);
Assert.IsType<NoopActivityProcessor>(builder.Processors[0]);
Assert.Same(processor, builder.Processors[0]);
}
[Fact]
public void PipelineBuilder_AddExporter()
{
var builder = new ActivityProcessorPipelineBuilder();
var exporter = new TestActivityExporter(null);
builder.SetExporter(exporter);
Assert.Same(exporter, builder.Exporter);
var processor = builder.Build();
Assert.Single(builder.Processors);
Assert.IsType<SimpleActivityProcessor>(builder.Processors.Single());
Assert.Same(processor, builder.Processors[0]);
}
[Fact]
public void PipelineBuilder_AddExporterAndExportingProcessor()
{
var builder = new ActivityProcessorPipelineBuilder();
var exporter = new TestActivityExporter(null);
builder.SetExporter(exporter);
bool processorFactoryCalled = false;
builder.SetExportingProcessor(e =>
{
processorFactoryCalled = true;
return new SimpleActivityProcessor(e);
});
var processor = builder.Build();
Assert.Single(builder.Processors);
Assert.True(processorFactoryCalled);
Assert.IsType<SimpleActivityProcessor>(builder.Processors.Single());
Assert.Same(processor, builder.Processors[0]);
}
[Fact]
public void PipelineBuilder_AddExportingProcessor()
{
var builder = new ActivityProcessorPipelineBuilder();
bool processorFactoryCalled = false;
var processor = new TestProcessor();
builder.SetExportingProcessor(e =>
{
processorFactoryCalled = true;
Assert.Null(e);
return processor;
});
Assert.Same(processor, builder.Build());
Assert.Single(builder.Processors);
Assert.True(processorFactoryCalled);
Assert.Same(processor, builder.Processors.Single());
}
[Fact]
public void PipelineBuilder_AddProcessor()
{
var builder = new ActivityProcessorPipelineBuilder();
bool processorFactoryCalled = false;
var processor = new TestProcessor();
builder.AddProcessor(e =>
{
processorFactoryCalled = true;
return processor;
});
Assert.Same(processor, builder.Build());
Assert.Single(builder.Processors);
Assert.True(processorFactoryCalled);
Assert.Same(processor, builder.Processors.Single());
}
[Fact]
public void PipelineBuilder_AddProcessorChain()
{
var builder = new ActivityProcessorPipelineBuilder();
bool processorFactory1Called = false;
bool processorFactory2Called = false;
bool processorFactory3Called = false;
builder
.AddProcessor(next =>
{
processorFactory1Called = true;
Assert.NotNull(next);
return new TestProcessor(next, "1");
})
.AddProcessor(next =>
{
processorFactory2Called = true;
Assert.NotNull(next);
return new TestProcessor(next, "2");
})
.AddProcessor(next =>
{
processorFactory3Called = true;
Assert.Null(next);
return new TestProcessor(next, "3");
});
var firstProcessor = (TestProcessor)builder.Build();
Assert.Equal(3, builder.Processors.Count);
Assert.True(processorFactory1Called);
Assert.True(processorFactory2Called);
Assert.True(processorFactory3Called);
Assert.Equal("1", firstProcessor.Name);
var secondProcessor = (TestProcessor)firstProcessor.Next;
Assert.Equal("2", secondProcessor.Name);
var thirdProcessor = (TestProcessor)secondProcessor.Next;
Assert.Equal("3", thirdProcessor.Name);
}
[Fact]
public void PipelineBuilder_AddProcessorChainWithExporter()
{
var builder = new ActivityProcessorPipelineBuilder();
bool processorFactory1Called = false;
bool processorFactory2Called = false;
bool exportingFactory3Called = false;
builder
.AddProcessor(next =>
{
processorFactory1Called = true;
Assert.NotNull(next);
return new TestProcessor(next, "1");
})
.AddProcessor(next =>
{
processorFactory2Called = true;
Assert.NotNull(next);
return new TestProcessor(next, "2");
})
.SetExportingProcessor(exporter =>
{
exportingFactory3Called = true;
Assert.NotNull(exporter);
return new SimpleActivityProcessor(exporter);
})
.SetExporter(new TestActivityExporter(null));
var firstProcessor = (TestProcessor)builder.Build();
Assert.Equal(3, builder.Processors.Count);
Assert.True(processorFactory1Called);
Assert.True(processorFactory2Called);
Assert.True(exportingFactory3Called);
Assert.Equal("1", firstProcessor.Name);
var secondProcessor = (TestProcessor)firstProcessor.Next;
Assert.Equal("2", secondProcessor.Name);
var thirdProcessor = secondProcessor.Next;
Assert.IsType<SimpleActivityProcessor>(thirdProcessor);
}
private class TestProcessor : ActivityProcessor
{
public readonly ActivityProcessor Next;
public readonly string Name;
public TestProcessor()
{
this.Name = null;
this.Name = null;
}
public TestProcessor(ActivityProcessor next, string name)
{
this.Next = next;
this.Name = name;
}
public override void OnStart(Activity span)
{
}
public override void OnEnd(Activity span)
{
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
}
}
}

View File

@ -0,0 +1,196 @@
// <copyright file="BroadcastActivityProcessorTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Trace;
using OpenTelemetry.Trace.Configuration;
using OpenTelemetry.Trace.Export;
using OpenTelemetry.Trace.Export.Internal;
using Xunit;
namespace OpenTelemetry.Tests.Impl.Trace.Config
{
public class BroadcastActivityProcessorTests
{
[Fact]
public void BroadcastProcessor_BadArgs()
{
Assert.Throws<ArgumentNullException>(() => new BroadcastActivityProcessor(null));
Assert.Throws<ArgumentException>(() => new BroadcastActivityProcessor(new SimpleActivityProcessor[0]));
}
[Fact]
public void BroadcastProcessor_CallsAllProcessorSequentially()
{
bool start1Called = false;
bool start2Called = false;
bool end1Called = false;
bool end2Called = false;
var processor1 = new TestProcessor(
ss =>
{
start1Called = true;
Assert.False(start2Called);
Assert.False(end1Called);
Assert.False(end2Called);
}, se =>
{
end1Called = true;
Assert.True(start1Called);
Assert.True(start2Called);
Assert.False(end2Called);
});
var processor2 = new TestProcessor(
ss =>
{
start2Called = true;
Assert.True(start1Called);
Assert.False(end1Called);
Assert.False(end2Called);
}, se =>
{
end2Called = true;
Assert.True(start1Called);
Assert.True(start2Called);
Assert.True(end1Called);
});
var broadcastProcessor = new BroadcastActivityProcessor(new[] { processor1, processor2 });
var activity = new Activity("somename");
broadcastProcessor.OnStart(activity);
Assert.True(start1Called);
Assert.True(start2Called);
broadcastProcessor.OnEnd(activity);
Assert.True(end1Called);
Assert.True(end2Called);
}
[Fact]
public void BroadcastProcessor_OneProcessorThrows()
{
bool start1Called = false;
bool start2Called = false;
bool end1Called = false;
bool end2Called = false;
var processor1 = new TestProcessor(
ss =>
{
start1Called = true;
Assert.False(start2Called);
Assert.False(end1Called);
Assert.False(end2Called);
throw new Exception("Start exception");
}, se =>
{
end1Called = true;
Assert.True(start1Called);
Assert.True(start2Called);
Assert.False(end2Called);
throw new Exception("End exception");
});
var processor2 = new TestProcessor(
ss =>
{
start2Called = true;
Assert.True(start1Called);
Assert.False(end1Called);
Assert.False(end2Called);
}, se =>
{
end2Called = true;
Assert.True(start1Called);
Assert.True(start2Called);
Assert.True(end1Called);
});
var broadcastProcessor = new BroadcastActivityProcessor(new[] { processor1, processor2 });
var activity = new Activity("somename");
broadcastProcessor.OnStart(activity);
Assert.True(start1Called);
Assert.True(start2Called);
broadcastProcessor.OnEnd(activity);
Assert.True(end1Called);
Assert.True(end2Called);
}
[Fact]
public void BroadcastProcessor_ShutsDownAll()
{
var processor1 = new TestProcessor(null, null);
var processor2 = new TestProcessor(null, null);
var broadcastProcessor = new BroadcastActivityProcessor(new[] { processor1, processor2 });
broadcastProcessor.ShutdownAsync(default);
Assert.True(processor1.ShutdownCalled);
Assert.True(processor2.ShutdownCalled);
broadcastProcessor.Dispose();
Assert.True(processor1.DisposedCalled);
Assert.True(processor2.DisposedCalled);
}
private class TestProcessor : ActivityProcessor, IDisposable
{
private readonly Action<Activity> onStart;
private readonly Action<Activity> onEnd;
public TestProcessor(Action<Activity> onStart, Action<Activity> onEnd)
{
this.onStart = onStart;
this.onEnd = onEnd;
}
public bool ShutdownCalled { get; private set; } = false;
public bool DisposedCalled { get; private set; } = false;
public override void OnStart(Activity span)
{
this.onStart?.Invoke(span);
}
public override void OnEnd(Activity span)
{
this.onEnd?.Invoke(span);
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
this.ShutdownCalled = true;
#if NET452
return Task.FromResult(0);
#else
return Task.CompletedTask;
#endif
}
public void Dispose()
{
this.DisposedCalled = true;
}
}
}
}

View File

@ -0,0 +1,176 @@
// <copyright file="SimpleActivityProcessorTest.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Testing.Export;
using OpenTelemetry.Trace.Configuration;
using OpenTelemetry.Trace.Samplers;
using Xunit;
namespace OpenTelemetry.Trace.Export.Test
{
public class SimpleActivityProcessorTest : IDisposable
{
private const string SpanName1 = "MySpanName/1";
private const string SpanName2 = "MySpanName/2";
private const string ActivitySourceName = "defaultactivitysource";
private TestActivityExporter activityExporter;
private OpenTelemetrySdk openTelemetry;
private ActivitySource activitySource;
public SimpleActivityProcessorTest()
{
this.activityExporter = new TestActivityExporter(null);
this.openTelemetry = OpenTelemetrySdk.EnableOpenTelemetry(b => b
.AddActivitySource(ActivitySourceName)
.AddProcessorPipeline(p => p
.SetExporter(this.activityExporter)
.SetExportingProcessor(e => new SimpleActivityProcessor(e)))
.SetSampler(new AlwaysOnActivitySampler()));
this.activitySource = new ActivitySource(ActivitySourceName);
}
[Fact]
public void ThrowsOnNullExporter()
{
Assert.Throws<ArgumentNullException>(() => new SimpleActivityProcessor(null));
}
[Fact]
public void ThrowsInExporter()
{
this.activityExporter = new TestActivityExporter(_ => throw new ArgumentException("123"));
this.openTelemetry = OpenTelemetrySdk.EnableOpenTelemetry(b => b
.AddActivitySource("cijo")
.AddProcessorPipeline(p => p
.SetExporter(this.activityExporter)
.SetExportingProcessor(e => new SimpleActivityProcessor(e))));
ActivitySource source = new ActivitySource("cijo");
var activity = source.StartActivity("somename");
// does not throw
activity.Stop();
}
[Fact]
public void ProcessorDoesNotBlockOnExporter()
{
this.activityExporter = new TestActivityExporter(async _ => await Task.Delay(500));
this.openTelemetry = OpenTelemetrySdk.EnableOpenTelemetry(b => b
.AddActivitySource("cijo")
.AddProcessorPipeline(p => p
.SetExporter(this.activityExporter)
.SetExportingProcessor(e => new SimpleActivityProcessor(e))));
ActivitySource source = new ActivitySource("cijo");
var activity = source.StartActivity("somename");
// does not block
var sw = Stopwatch.StartNew();
activity.Stop();
sw.Stop();
Assert.InRange(sw.Elapsed, TimeSpan.Zero, TimeSpan.FromMilliseconds(100));
var exported = this.WaitForSpans(this.activityExporter, 1, TimeSpan.FromMilliseconds(600));
Assert.Single(exported);
}
[Fact]
public async Task ShutdownTwice()
{
var activityProcessor = new SimpleActivityProcessor(new TestActivityExporter(null));
await activityProcessor.ShutdownAsync(CancellationToken.None).ConfigureAwait(false);
// does not throw
await activityProcessor.ShutdownAsync(CancellationToken.None).ConfigureAwait(false);
}
[Fact]
public void ExportDifferentSampledSpans()
{
var span1 = this.CreateSampledEndedSpan(SpanName1);
var span2 = this.CreateSampledEndedSpan(SpanName2);
var exported = this.WaitForSpans(this.activityExporter, 2, TimeSpan.FromMilliseconds(100));
Assert.Equal(2, exported.Length);
Assert.Contains(span1, exported);
Assert.Contains(span2, exported);
}
[Fact(Skip = "Reenable once AlwaysParentActivitySampler is added")]
public void ExportNotSampledSpans()
{
var span1 = this.CreateNotSampledEndedSpan(SpanName1);
var span2 = this.CreateSampledEndedSpan(SpanName2);
// Spans are recorded and exported in the same order as they are ended, we test that a non
// sampled span is not exported by creating and ending a sampled span after a non sampled span
// and checking that the first exported span is the sampled span (the non sampled did not get
// exported).
var exported = this.WaitForSpans(this.activityExporter, 1, TimeSpan.FromMilliseconds(100));
// Need to check this because otherwise the variable span1 is unused, other option is to not
// have a span1 variable.
Assert.Single(exported);
Assert.Contains(span2, exported);
}
public void Dispose()
{
this.activityExporter.ShutdownAsync(CancellationToken.None);
Activity.Current = null;
}
private Activity CreateSampledEndedSpan(string spanName)
{
var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.Recorded);
var activity = this.activitySource.StartActivity(spanName, ActivityKind.Internal, context);
activity.Stop();
return activity;
}
private Activity CreateNotSampledEndedSpan(string spanName)
{
var context = new ActivityContext(ActivityTraceId.CreateRandom(), ActivitySpanId.CreateRandom(), ActivityTraceFlags.None);
var activity = this.activitySource.StartActivity(spanName, ActivityKind.Internal, context);
activity.Stop();
return activity;
}
private Activity[] WaitForSpans(TestActivityExporter exporter, int spanCount, TimeSpan timeout)
{
Assert.True(
SpinWait.SpinUntil(
() =>
{
Thread.Sleep(0);
return exporter.ExportedSpans.Length >= spanCount;
}, timeout + TimeSpan.FromMilliseconds(20)));
return exporter.ExportedSpans;
}
}
}