* clean up

* simplify the code

* string.Empty

* fix nit
This commit is contained in:
Reiley Yang 2020-08-12 23:05:16 -07:00 committed by GitHub
parent 0af38acb9d
commit b5695edc43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 101 additions and 316 deletions

View File

@ -108,7 +108,7 @@ namespace OpenTelemetry.Trace
public override Task ForceFlushAsync(CancellationToken cancellationToken)
{
var cur = this.head;
var task = cur.Value.ShutdownAsync(cancellationToken);
var task = cur.Value.ForceFlushAsync(cancellationToken);
for (cur = cur.Next; cur != null; cur = cur.Next)
{

View File

@ -1,124 +0,0 @@
// <copyright file="FanOutActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Trace.Internal
{
internal class FanOutActivityProcessor : ActivityProcessor
{
private readonly List<ActivityProcessor> processors;
private bool disposed;
public FanOutActivityProcessor(IEnumerable<ActivityProcessor> processors)
{
if (processors == null)
{
throw new ArgumentNullException(nameof(processors));
}
if (!processors.Any())
{
throw new ArgumentException($"{nameof(processors)} collection is empty");
}
this.processors = new List<ActivityProcessor>(processors);
}
public override void OnEnd(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnEnd(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnEnd", e);
}
}
}
public override void OnStart(Activity activity)
{
foreach (var processor in this.processors)
{
try
{
processor.OnStart(activity);
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("OnStart", e);
}
}
}
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var processor in this.processors)
{
tasks.Add(processor.ShutdownAsync(cancellationToken));
}
return Task.WhenAll(tasks);
}
public override Task ForceFlushAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>(this.processors.Count());
foreach (var processor in this.processors)
{
tasks.Add(processor.ForceFlushAsync(cancellationToken));
}
return Task.WhenAll(tasks);
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (disposing && !this.disposed)
{
foreach (var processor in this.processors)
{
try
{
if (processor is IDisposable disposable)
{
disposable.Dispose();
}
}
catch (Exception e)
{
OpenTelemetrySdkEventSource.Log.SpanProcessorException("Dispose", e);
}
}
this.disposed = true;
}
}
}
}

View File

@ -1,22 +0,0 @@
// <copyright file="NoopActivityProcessor.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.Trace.Internal
{
internal sealed class NoopActivityProcessor : ActivityProcessor
{
}
}

View File

@ -30,7 +30,6 @@ namespace Benchmarks
public OpenTelemetrySdkBenchmarksActivity()
{
// Not configuring pipeline, which will result in default NoopActivityProcessor.
var openTel = Sdk.CreateTracerProviderBuilder()
.AddSource("BenchMark")
.Build();

View File

@ -0,0 +1,100 @@
// <copyright file="CompositeActivityProcessorTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using OpenTelemetry.Tests.Shared;
using Xunit;
namespace OpenTelemetry.Trace.Tests
{
public class CompositeActivityProcessorTests
{
[Fact]
public void CompositeActivityProcessor_BadArgs()
{
Assert.Throws<ArgumentNullException>(() => new CompositeActivityProcessor(null));
Assert.Throws<ArgumentException>(() => new CompositeActivityProcessor(new SimpleActivityProcessor[0]));
}
[Fact]
public void CompositeActivityProcessor_CallsAllProcessorSequentially()
{
var result = string.Empty;
var p1 = new TestActivityProcessor(
activity => { result += "1"; },
activity => { result += "3"; });
var p2 = new TestActivityProcessor(
activity => { result += "2"; },
activity => { result += "4"; });
var activity = new Activity("test");
using (var processor = new CompositeActivityProcessor(new[] { p1, p2 }))
{
processor.OnStart(activity);
processor.OnEnd(activity);
}
Assert.Equal("1234", result);
}
[Fact]
public void CompositeActivityProcessor_ProcessorThrows()
{
var p1 = new TestActivityProcessor(
activity => { throw new Exception("Start exception"); },
activity => { throw new Exception("End exception"); });
var activity = new Activity("test");
using (var processor = new CompositeActivityProcessor(new[] { p1 }))
{
Assert.Throws<Exception>(() => { processor.OnStart(activity); });
Assert.Throws<Exception>(() => { processor.OnEnd(activity); });
}
}
[Fact]
public void CompositeActivityProcessor_ShutsDownAll()
{
var p1 = new TestActivityProcessor(null, null);
var p2 = new TestActivityProcessor(null, null);
using (var processor = new CompositeActivityProcessor(new[] { p1, p2 }))
{
processor.ShutdownAsync(default).Wait();
Assert.True(p1.ShutdownCalled);
Assert.True(p2.ShutdownCalled);
}
}
[Fact]
public void CompositeActivityProcessor_ForceFlush()
{
var p1 = new TestActivityProcessor(null, null);
var p2 = new TestActivityProcessor(null, null);
using (var processor = new CompositeActivityProcessor(new[] { p1, p2 }))
{
processor.ForceFlushAsync(default).Wait();
Assert.True(p1.ForceFlushCalled);
Assert.True(p2.ForceFlushCalled);
}
}
}
}

View File

@ -1,168 +0,0 @@
// <copyright file="FanOutActivityProcessorTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using OpenTelemetry.Tests.Shared;
using OpenTelemetry.Trace.Internal;
using Xunit;
namespace OpenTelemetry.Trace.Tests
{
public class FanOutActivityProcessorTests
{
[Fact]
public void BroadcastProcessor_BadArgs()
{
Assert.Throws<ArgumentNullException>(() => new FanOutActivityProcessor(null));
Assert.Throws<ArgumentException>(() => new FanOutActivityProcessor(new SimpleActivityProcessor[0]));
}
[Fact]
public void BroadcastProcessor_CallsAllProcessorSequentially()
{
bool start1Called = false;
bool start2Called = false;
bool end1Called = false;
bool end2Called = false;
var processor1 = new TestActivityProcessor(
ss =>
{
start1Called = true;
Assert.False(start2Called);
Assert.False(end1Called);
Assert.False(end2Called);
}, se =>
{
end1Called = true;
Assert.True(start1Called);
Assert.True(start2Called);
Assert.False(end2Called);
});
var processor2 = new TestActivityProcessor(
ss =>
{
start2Called = true;
Assert.True(start1Called);
Assert.False(end1Called);
Assert.False(end2Called);
}, se =>
{
end2Called = true;
Assert.True(start1Called);
Assert.True(start2Called);
Assert.True(end1Called);
});
var broadcastProcessor = new FanOutActivityProcessor(new[] { processor1, processor2 });
var activity = new Activity("somename");
broadcastProcessor.OnStart(activity);
Assert.True(start1Called);
Assert.True(start2Called);
broadcastProcessor.OnEnd(activity);
Assert.True(end1Called);
Assert.True(end2Called);
}
[Fact]
public void BroadcastProcessor_OneProcessorThrows()
{
bool start1Called = false;
bool start2Called = false;
bool end1Called = false;
bool end2Called = false;
var processor1 = new TestActivityProcessor(
ss =>
{
start1Called = true;
Assert.False(start2Called);
Assert.False(end1Called);
Assert.False(end2Called);
throw new Exception("Start exception");
}, se =>
{
end1Called = true;
Assert.True(start1Called);
Assert.True(start2Called);
Assert.False(end2Called);
throw new Exception("End exception");
});
var processor2 = new TestActivityProcessor(
ss =>
{
start2Called = true;
Assert.True(start1Called);
Assert.False(end1Called);
Assert.False(end2Called);
}, se =>
{
end2Called = true;
Assert.True(start1Called);
Assert.True(start2Called);
Assert.True(end1Called);
});
var broadcastProcessor = new FanOutActivityProcessor(new[] { processor1, processor2 });
var activity = new Activity("somename");
broadcastProcessor.OnStart(activity);
Assert.True(start1Called);
Assert.True(start2Called);
broadcastProcessor.OnEnd(activity);
Assert.True(end1Called);
Assert.True(end2Called);
}
[Fact]
public void BroadcastProcessor_ShutsDownAll()
{
var processor1 = new TestActivityProcessor(null, null);
var processor2 = new TestActivityProcessor(null, null);
var broadcastProcessor = new FanOutActivityProcessor(new[] { processor1, processor2 });
broadcastProcessor.ShutdownAsync(default);
Assert.True(processor1.ShutdownCalled);
Assert.True(processor2.ShutdownCalled);
broadcastProcessor.Dispose();
Assert.True(processor1.DisposedCalled);
Assert.True(processor2.DisposedCalled);
}
[Fact]
public void BroadcastProcessor_ForceFlush()
{
var processor1 = new TestActivityProcessor(null, null);
var processor2 = new TestActivityProcessor(null, null);
var broadcastProcessor = new FanOutActivityProcessor(new[] { processor1, processor2 });
broadcastProcessor.ForceFlushAsync(default);
Assert.True(processor1.ForceFlushCalled);
Assert.True(processor2.ForceFlushCalled);
broadcastProcessor.Dispose();
Assert.True(processor1.DisposedCalled);
Assert.True(processor2.DisposedCalled);
}
}
}