Introduce a ReadWriteSpan interface and pass it to the onStart of the SpanProcessor (#1574)
* start working on a ReadWriteSpan for startSpan * Finish converting the disruptor and the unit tests to the new model. * Add some simple javadoc to the ReadWriteSpan interface
This commit is contained in:
parent
bea5492e6d
commit
7e87a69ae0
|
|
@ -42,7 +42,7 @@ public final class MultiSpanProcessor implements SpanProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart(ReadableSpan readableSpan) {
|
public void onStart(ReadWriteSpan readableSpan) {
|
||||||
for (SpanProcessor spanProcessor : spanProcessorsStart) {
|
for (SpanProcessor spanProcessor : spanProcessorsStart) {
|
||||||
spanProcessor.onStart(readableSpan);
|
spanProcessor.onStart(readableSpan);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ final class NoopSpanProcessor implements SpanProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart(ReadableSpan span) {}
|
public void onStart(ReadWriteSpan span) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isStartRequired() {
|
public boolean isStartRequired() {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,25 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2020, OpenTelemetry Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.sdk.trace;
|
||||||
|
|
||||||
|
import io.opentelemetry.trace.Span;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A combination of the write methods from the {@link Span} interface and the read methods from the
|
||||||
|
* {@link ReadableSpan} interface.
|
||||||
|
*/
|
||||||
|
public interface ReadWriteSpan extends Span, ReadableSpan {}
|
||||||
|
|
@ -51,7 +51,7 @@ import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
/** Implementation for the {@link Span} class that records trace events. */
|
/** Implementation for the {@link Span} class that records trace events. */
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
final class RecordEventsReadableSpan implements ReadableSpan, Span {
|
final class RecordEventsReadableSpan implements ReadWriteSpan {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(Tracer.class.getName());
|
private static final Logger logger = Logger.getLogger(Tracer.class.getName());
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ public interface SpanProcessor {
|
||||||
*
|
*
|
||||||
* @param span the {@code ReadableSpan} that just started.
|
* @param span the {@code ReadableSpan} that just started.
|
||||||
*/
|
*/
|
||||||
void onStart(ReadableSpan span);
|
void onStart(ReadWriteSpan span);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns {@code true} if this {@link SpanProcessor} requires start events.
|
* Returns {@code true} if this {@link SpanProcessor} requires start events.
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import io.opentelemetry.metrics.Meter;
|
||||||
import io.opentelemetry.sdk.common.DaemonThreadFactory;
|
import io.opentelemetry.sdk.common.DaemonThreadFactory;
|
||||||
import io.opentelemetry.sdk.common.export.CompletableResultCode;
|
import io.opentelemetry.sdk.common.export.CompletableResultCode;
|
||||||
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
||||||
|
import io.opentelemetry.sdk.trace.ReadWriteSpan;
|
||||||
import io.opentelemetry.sdk.trace.ReadableSpan;
|
import io.opentelemetry.sdk.trace.ReadableSpan;
|
||||||
import io.opentelemetry.sdk.trace.SpanProcessor;
|
import io.opentelemetry.sdk.trace.SpanProcessor;
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||||
|
|
@ -110,7 +111,7 @@ public final class BatchSpanProcessor implements SpanProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart(ReadableSpan span) {}
|
public void onStart(ReadWriteSpan span) {}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isStartRequired() {
|
public boolean isStartRequired() {
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ package io.opentelemetry.sdk.trace.export;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import io.opentelemetry.sdk.common.export.CompletableResultCode;
|
import io.opentelemetry.sdk.common.export.CompletableResultCode;
|
||||||
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
||||||
|
import io.opentelemetry.sdk.trace.ReadWriteSpan;
|
||||||
import io.opentelemetry.sdk.trace.ReadableSpan;
|
import io.opentelemetry.sdk.trace.ReadableSpan;
|
||||||
import io.opentelemetry.sdk.trace.SpanProcessor;
|
import io.opentelemetry.sdk.trace.SpanProcessor;
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||||
|
|
@ -64,7 +65,7 @@ public final class SimpleSpanProcessor implements SpanProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart(ReadableSpan span) {
|
public void onStart(ReadWriteSpan span) {
|
||||||
// Do nothing.
|
// Do nothing.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ class MultiSpanProcessorTest {
|
||||||
@Mock private SpanProcessor spanProcessor1;
|
@Mock private SpanProcessor spanProcessor1;
|
||||||
@Mock private SpanProcessor spanProcessor2;
|
@Mock private SpanProcessor spanProcessor2;
|
||||||
@Mock private ReadableSpan readableSpan;
|
@Mock private ReadableSpan readableSpan;
|
||||||
|
@Mock private ReadWriteSpan readWriteSpan;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
|
|
@ -48,7 +49,7 @@ class MultiSpanProcessorTest {
|
||||||
@Test
|
@Test
|
||||||
void empty() {
|
void empty() {
|
||||||
SpanProcessor multiSpanProcessor = MultiSpanProcessor.create(Collections.emptyList());
|
SpanProcessor multiSpanProcessor = MultiSpanProcessor.create(Collections.emptyList());
|
||||||
multiSpanProcessor.onStart(readableSpan);
|
multiSpanProcessor.onStart(readWriteSpan);
|
||||||
multiSpanProcessor.onEnd(readableSpan);
|
multiSpanProcessor.onEnd(readableSpan);
|
||||||
multiSpanProcessor.shutdown();
|
multiSpanProcessor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
@ -57,8 +58,8 @@ class MultiSpanProcessorTest {
|
||||||
void oneSpanProcessor() {
|
void oneSpanProcessor() {
|
||||||
SpanProcessor multiSpanProcessor =
|
SpanProcessor multiSpanProcessor =
|
||||||
MultiSpanProcessor.create(Collections.singletonList(spanProcessor1));
|
MultiSpanProcessor.create(Collections.singletonList(spanProcessor1));
|
||||||
multiSpanProcessor.onStart(readableSpan);
|
multiSpanProcessor.onStart(readWriteSpan);
|
||||||
verify(spanProcessor1).onStart(same(readableSpan));
|
verify(spanProcessor1).onStart(same(readWriteSpan));
|
||||||
|
|
||||||
multiSpanProcessor.onEnd(readableSpan);
|
multiSpanProcessor.onEnd(readableSpan);
|
||||||
verify(spanProcessor1).onEnd(same(readableSpan));
|
verify(spanProcessor1).onEnd(same(readableSpan));
|
||||||
|
|
@ -83,7 +84,7 @@ class MultiSpanProcessorTest {
|
||||||
assertThat(multiSpanProcessor.isStartRequired()).isFalse();
|
assertThat(multiSpanProcessor.isStartRequired()).isFalse();
|
||||||
assertThat(multiSpanProcessor.isEndRequired()).isFalse();
|
assertThat(multiSpanProcessor.isEndRequired()).isFalse();
|
||||||
|
|
||||||
multiSpanProcessor.onStart(readableSpan);
|
multiSpanProcessor.onStart(readWriteSpan);
|
||||||
verifyNoMoreInteractions(spanProcessor1);
|
verifyNoMoreInteractions(spanProcessor1);
|
||||||
|
|
||||||
multiSpanProcessor.onEnd(readableSpan);
|
multiSpanProcessor.onEnd(readableSpan);
|
||||||
|
|
@ -100,9 +101,9 @@ class MultiSpanProcessorTest {
|
||||||
void twoSpanProcessor() {
|
void twoSpanProcessor() {
|
||||||
SpanProcessor multiSpanProcessor =
|
SpanProcessor multiSpanProcessor =
|
||||||
MultiSpanProcessor.create(Arrays.asList(spanProcessor1, spanProcessor2));
|
MultiSpanProcessor.create(Arrays.asList(spanProcessor1, spanProcessor2));
|
||||||
multiSpanProcessor.onStart(readableSpan);
|
multiSpanProcessor.onStart(readWriteSpan);
|
||||||
verify(spanProcessor1).onStart(same(readableSpan));
|
verify(spanProcessor1).onStart(same(readWriteSpan));
|
||||||
verify(spanProcessor2).onStart(same(readableSpan));
|
verify(spanProcessor2).onStart(same(readWriteSpan));
|
||||||
|
|
||||||
multiSpanProcessor.onEnd(readableSpan);
|
multiSpanProcessor.onEnd(readableSpan);
|
||||||
verify(spanProcessor1).onEnd(same(readableSpan));
|
verify(spanProcessor1).onEnd(same(readableSpan));
|
||||||
|
|
@ -127,9 +128,9 @@ class MultiSpanProcessorTest {
|
||||||
assertThat(multiSpanProcessor.isStartRequired()).isTrue();
|
assertThat(multiSpanProcessor.isStartRequired()).isTrue();
|
||||||
assertThat(multiSpanProcessor.isEndRequired()).isTrue();
|
assertThat(multiSpanProcessor.isEndRequired()).isTrue();
|
||||||
|
|
||||||
multiSpanProcessor.onStart(readableSpan);
|
multiSpanProcessor.onStart(readWriteSpan);
|
||||||
verify(spanProcessor1).onStart(same(readableSpan));
|
verify(spanProcessor1).onStart(same(readWriteSpan));
|
||||||
verify(spanProcessor2, times(0)).onStart(any(ReadableSpan.class));
|
verify(spanProcessor2, times(0)).onStart(any(ReadWriteSpan.class));
|
||||||
|
|
||||||
multiSpanProcessor.onEnd(readableSpan);
|
multiSpanProcessor.onEnd(readableSpan);
|
||||||
verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class));
|
verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class));
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ import org.mockito.MockitoAnnotations;
|
||||||
/** Unit tests for {@link NoopSpanProcessorTest}. */
|
/** Unit tests for {@link NoopSpanProcessorTest}. */
|
||||||
class NoopSpanProcessorTest {
|
class NoopSpanProcessorTest {
|
||||||
@Mock private ReadableSpan readableSpan;
|
@Mock private ReadableSpan readableSpan;
|
||||||
|
@Mock private ReadWriteSpan readWriteSpan;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
|
|
@ -35,7 +36,7 @@ class NoopSpanProcessorTest {
|
||||||
@Test
|
@Test
|
||||||
void noCrash() {
|
void noCrash() {
|
||||||
SpanProcessor noopSpanProcessor = NoopSpanProcessor.getInstance();
|
SpanProcessor noopSpanProcessor = NoopSpanProcessor.getInstance();
|
||||||
noopSpanProcessor.onStart(readableSpan);
|
noopSpanProcessor.onStart(readWriteSpan);
|
||||||
assertThat(noopSpanProcessor.isStartRequired()).isFalse();
|
assertThat(noopSpanProcessor.isStartRequired()).isFalse();
|
||||||
noopSpanProcessor.onEnd(readableSpan);
|
noopSpanProcessor.onEnd(readableSpan);
|
||||||
assertThat(noopSpanProcessor.isEndRequired()).isFalse();
|
assertThat(noopSpanProcessor.isEndRequired()).isFalse();
|
||||||
|
|
|
||||||
|
|
@ -160,7 +160,7 @@ class TracerSdkTest {
|
||||||
private final AtomicLong numberOfSpansFinished = new AtomicLong();
|
private final AtomicLong numberOfSpansFinished = new AtomicLong();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart(ReadableSpan span) {
|
public void onStart(ReadWriteSpan span) {
|
||||||
numberOfSpansStarted.incrementAndGet();
|
numberOfSpansStarted.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import io.opentelemetry.sdk.common.export.CompletableResultCode;
|
import io.opentelemetry.sdk.common.export.CompletableResultCode;
|
||||||
import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester;
|
import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester;
|
||||||
|
import io.opentelemetry.sdk.trace.ReadWriteSpan;
|
||||||
import io.opentelemetry.sdk.trace.ReadableSpan;
|
import io.opentelemetry.sdk.trace.ReadableSpan;
|
||||||
import io.opentelemetry.sdk.trace.Samplers;
|
import io.opentelemetry.sdk.trace.Samplers;
|
||||||
import io.opentelemetry.sdk.trace.TestUtils;
|
import io.opentelemetry.sdk.trace.TestUtils;
|
||||||
|
|
@ -52,6 +53,7 @@ class SimpleSpanProcessorTest {
|
||||||
private static final long MAX_SCHEDULE_DELAY_MILLIS = 500;
|
private static final long MAX_SCHEDULE_DELAY_MILLIS = 500;
|
||||||
private static final String SPAN_NAME = "MySpanName";
|
private static final String SPAN_NAME = "MySpanName";
|
||||||
@Mock private ReadableSpan readableSpan;
|
@Mock private ReadableSpan readableSpan;
|
||||||
|
@Mock private ReadWriteSpan readWriteSpan;
|
||||||
@Mock private SpanExporter spanExporter;
|
@Mock private SpanExporter spanExporter;
|
||||||
private final TracerSdkProvider tracerSdkFactory = TracerSdkProvider.builder().build();
|
private final TracerSdkProvider tracerSdkFactory = TracerSdkProvider.builder().build();
|
||||||
private final Tracer tracer = tracerSdkFactory.get("SimpleSpanProcessor");
|
private final Tracer tracer = tracerSdkFactory.get("SimpleSpanProcessor");
|
||||||
|
|
@ -92,7 +94,7 @@ class SimpleSpanProcessorTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void onStartSync() {
|
void onStartSync() {
|
||||||
simpleSampledSpansProcessor.onStart(readableSpan);
|
simpleSampledSpansProcessor.onStart(readWriteSpan);
|
||||||
verifyNoInteractions(spanExporter);
|
verifyNoInteractions(spanExporter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
|
||||||
import com.lmax.disruptor.SleepingWaitStrategy;
|
import com.lmax.disruptor.SleepingWaitStrategy;
|
||||||
import com.lmax.disruptor.WaitStrategy;
|
import com.lmax.disruptor.WaitStrategy;
|
||||||
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
||||||
|
import io.opentelemetry.sdk.trace.ReadWriteSpan;
|
||||||
import io.opentelemetry.sdk.trace.ReadableSpan;
|
import io.opentelemetry.sdk.trace.ReadableSpan;
|
||||||
import io.opentelemetry.sdk.trace.SpanProcessor;
|
import io.opentelemetry.sdk.trace.SpanProcessor;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
@ -64,7 +65,7 @@ public final class DisruptorAsyncSpanProcessor implements SpanProcessor {
|
||||||
// TODO: Add metrics for dropped spans.
|
// TODO: Add metrics for dropped spans.
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart(ReadableSpan span) {
|
public void onStart(ReadWriteSpan span) {
|
||||||
if (!startRequired) {
|
if (!startRequired) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import com.lmax.disruptor.WaitStrategy;
|
||||||
import com.lmax.disruptor.dsl.Disruptor;
|
import com.lmax.disruptor.dsl.Disruptor;
|
||||||
import com.lmax.disruptor.dsl.ProducerType;
|
import com.lmax.disruptor.dsl.ProducerType;
|
||||||
import io.opentelemetry.sdk.common.DaemonThreadFactory;
|
import io.opentelemetry.sdk.common.DaemonThreadFactory;
|
||||||
|
import io.opentelemetry.sdk.trace.ReadWriteSpan;
|
||||||
import io.opentelemetry.sdk.trace.ReadableSpan;
|
import io.opentelemetry.sdk.trace.ReadableSpan;
|
||||||
import io.opentelemetry.sdk.trace.SpanProcessor;
|
import io.opentelemetry.sdk.trace.SpanProcessor;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
@ -41,18 +42,17 @@ import javax.annotation.concurrent.ThreadSafe;
|
||||||
final class DisruptorEventQueue {
|
final class DisruptorEventQueue {
|
||||||
private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName());
|
private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName());
|
||||||
private static final String WORKER_THREAD_NAME = "DisruptorEventQueue_WorkerThread";
|
private static final String WORKER_THREAD_NAME = "DisruptorEventQueue_WorkerThread";
|
||||||
private static final EventTranslatorThreeArg<
|
private static final EventTranslatorThreeArg<DisruptorEvent, EventType, Object, CountDownLatch>
|
||||||
DisruptorEvent, EventType, ReadableSpan, CountDownLatch>
|
|
||||||
TRANSLATOR_THREE_ARG =
|
TRANSLATOR_THREE_ARG =
|
||||||
new EventTranslatorThreeArg<DisruptorEvent, EventType, ReadableSpan, CountDownLatch>() {
|
new EventTranslatorThreeArg<DisruptorEvent, EventType, Object, CountDownLatch>() {
|
||||||
@Override
|
@Override
|
||||||
public void translateTo(
|
public void translateTo(
|
||||||
DisruptorEvent event,
|
DisruptorEvent event,
|
||||||
long sequence,
|
long sequence,
|
||||||
EventType arg0,
|
EventType eventType,
|
||||||
ReadableSpan arg1,
|
Object span,
|
||||||
CountDownLatch arg2) {
|
CountDownLatch countDownLatch) {
|
||||||
event.setEntry(arg0, arg1, arg2);
|
event.setEntry(eventType, span, countDownLatch);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
private static final EventFactory<DisruptorEvent> EVENT_FACTORY =
|
private static final EventFactory<DisruptorEvent> EVENT_FACTORY =
|
||||||
|
|
@ -99,7 +99,7 @@ final class DisruptorEventQueue {
|
||||||
this.blocking = blocking;
|
this.blocking = blocking;
|
||||||
}
|
}
|
||||||
|
|
||||||
void enqueueStartEvent(ReadableSpan span) {
|
void enqueueStartEvent(ReadWriteSpan span) {
|
||||||
if (isShutdown) {
|
if (isShutdown) {
|
||||||
if (!loggedShutdownMessage.getAndSet(true)) {
|
if (!loggedShutdownMessage.getAndSet(true)) {
|
||||||
logger.info("Attempted to enqueue start event after Disruptor shutdown.");
|
logger.info("Attempted to enqueue start event after Disruptor shutdown.");
|
||||||
|
|
@ -156,33 +156,31 @@ final class DisruptorEventQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueues an event on the {@link DisruptorEventQueue}.
|
// Enqueues an event on the {@link DisruptorEventQueue}.
|
||||||
private void enqueue(EventType eventType, ReadableSpan readableSpan, CountDownLatch flushLatch) {
|
private void enqueue(EventType eventType, Object span, CountDownLatch flushLatch) {
|
||||||
if (blocking) {
|
if (blocking) {
|
||||||
ringBuffer.publishEvent(TRANSLATOR_THREE_ARG, eventType, readableSpan, flushLatch);
|
ringBuffer.publishEvent(TRANSLATOR_THREE_ARG, eventType, span, flushLatch);
|
||||||
} else {
|
} else {
|
||||||
// TODO: Record metrics if element not added.
|
// TODO: Record metrics if element not added.
|
||||||
ringBuffer.tryPublishEvent(TRANSLATOR_THREE_ARG, eventType, readableSpan, flushLatch);
|
ringBuffer.tryPublishEvent(TRANSLATOR_THREE_ARG, eventType, span, flushLatch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
|
// An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry.
|
||||||
private static final class DisruptorEvent {
|
private static final class DisruptorEvent {
|
||||||
@Nullable private ReadableSpan readableSpan = null;
|
@Nullable private Object span = null;
|
||||||
@Nullable private EventType eventType = null;
|
@Nullable private EventType eventType = null;
|
||||||
@Nullable private CountDownLatch waitingCounter = null;
|
@Nullable private CountDownLatch waitingCounter = null;
|
||||||
|
|
||||||
void setEntry(
|
void setEntry(
|
||||||
@Nullable EventType eventType,
|
@Nullable EventType eventType, @Nullable Object span, @Nullable CountDownLatch flushLatch) {
|
||||||
@Nullable ReadableSpan readableSpan,
|
this.span = span;
|
||||||
@Nullable CountDownLatch flushLatch) {
|
|
||||||
this.readableSpan = readableSpan;
|
|
||||||
this.eventType = eventType;
|
this.eventType = eventType;
|
||||||
this.waitingCounter = flushLatch;
|
this.waitingCounter = flushLatch;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
ReadableSpan getReadableSpan() {
|
Object getSpan() {
|
||||||
return readableSpan;
|
return span;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
|
@ -206,7 +204,7 @@ final class DisruptorEventQueue {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) {
|
public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) {
|
||||||
final ReadableSpan readableSpan = event.getReadableSpan();
|
final Object readableSpan = event.getSpan();
|
||||||
final EventType eventType = event.getEventType();
|
final EventType eventType = event.getEventType();
|
||||||
if (eventType == null) {
|
if (eventType == null) {
|
||||||
logger.warning("Disruptor enqueued null element type.");
|
logger.warning("Disruptor enqueued null element type.");
|
||||||
|
|
@ -215,10 +213,10 @@ final class DisruptorEventQueue {
|
||||||
try {
|
try {
|
||||||
switch (eventType) {
|
switch (eventType) {
|
||||||
case ON_START:
|
case ON_START:
|
||||||
spanProcessor.onStart(readableSpan);
|
spanProcessor.onStart((ReadWriteSpan) readableSpan);
|
||||||
break;
|
break;
|
||||||
case ON_END:
|
case ON_END:
|
||||||
spanProcessor.onEnd(readableSpan);
|
spanProcessor.onEnd((ReadableSpan) readableSpan);
|
||||||
break;
|
break;
|
||||||
case ON_SHUTDOWN:
|
case ON_SHUTDOWN:
|
||||||
spanProcessor.shutdown();
|
spanProcessor.shutdown();
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
||||||
import io.opentelemetry.sdk.trace.MultiSpanProcessor;
|
import io.opentelemetry.sdk.trace.MultiSpanProcessor;
|
||||||
|
import io.opentelemetry.sdk.trace.ReadWriteSpan;
|
||||||
import io.opentelemetry.sdk.trace.ReadableSpan;
|
import io.opentelemetry.sdk.trace.ReadableSpan;
|
||||||
import io.opentelemetry.sdk.trace.SpanProcessor;
|
import io.opentelemetry.sdk.trace.SpanProcessor;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
@ -38,6 +39,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
private static final boolean NOT_REQUIRED = false;
|
private static final boolean NOT_REQUIRED = false;
|
||||||
|
|
||||||
@Mock private ReadableSpan readableSpan;
|
@Mock private ReadableSpan readableSpan;
|
||||||
|
@Mock private ReadWriteSpan readWriteSpan;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
void setUp() {
|
||||||
|
|
@ -62,7 +64,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart(ReadableSpan span) {
|
public void onStart(ReadWriteSpan span) {
|
||||||
counterOnStart.incrementAndGet();
|
counterOnStart.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -128,7 +130,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isTrue();
|
assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isTrue();
|
||||||
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
|
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
|
||||||
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
|
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
|
||||||
disruptorAsyncSpanProcessor.onStart(readableSpan);
|
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
|
||||||
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
||||||
disruptorAsyncSpanProcessor.forceFlush();
|
disruptorAsyncSpanProcessor.forceFlush();
|
||||||
disruptorAsyncSpanProcessor.shutdown();
|
disruptorAsyncSpanProcessor.shutdown();
|
||||||
|
|
@ -148,7 +150,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isTrue();
|
assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isTrue();
|
||||||
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
|
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
|
||||||
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
|
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
|
||||||
disruptorAsyncSpanProcessor.onStart(readableSpan);
|
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
|
||||||
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
||||||
disruptorAsyncSpanProcessor.forceFlush();
|
disruptorAsyncSpanProcessor.forceFlush();
|
||||||
disruptorAsyncSpanProcessor.shutdown();
|
disruptorAsyncSpanProcessor.shutdown();
|
||||||
|
|
@ -168,7 +170,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isFalse();
|
assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isFalse();
|
||||||
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
|
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
|
||||||
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
|
assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0);
|
||||||
disruptorAsyncSpanProcessor.onStart(readableSpan);
|
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
|
||||||
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
||||||
disruptorAsyncSpanProcessor.forceFlush();
|
disruptorAsyncSpanProcessor.forceFlush();
|
||||||
disruptorAsyncSpanProcessor.shutdown();
|
disruptorAsyncSpanProcessor.shutdown();
|
||||||
|
|
@ -197,7 +199,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor =
|
DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor =
|
||||||
DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build();
|
DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build();
|
||||||
disruptorAsyncSpanProcessor.shutdown();
|
disruptorAsyncSpanProcessor.shutdown();
|
||||||
disruptorAsyncSpanProcessor.onStart(readableSpan);
|
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
|
||||||
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
||||||
disruptorAsyncSpanProcessor.forceFlush();
|
disruptorAsyncSpanProcessor.forceFlush();
|
||||||
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
|
assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0);
|
||||||
|
|
@ -214,7 +216,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor =
|
DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor =
|
||||||
DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build();
|
DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build();
|
||||||
for (int i = 1; i <= tenK; i++) {
|
for (int i = 1; i <= tenK; i++) {
|
||||||
disruptorAsyncSpanProcessor.onStart(readableSpan);
|
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
|
||||||
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
||||||
if (i % 10 == 0) {
|
if (i % 10 == 0) {
|
||||||
disruptorAsyncSpanProcessor.forceFlush();
|
disruptorAsyncSpanProcessor.forceFlush();
|
||||||
|
|
@ -236,7 +238,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
MultiSpanProcessor.create(
|
MultiSpanProcessor.create(
|
||||||
Arrays.asList(incrementSpanProcessor1, incrementSpanProcessor2)))
|
Arrays.asList(incrementSpanProcessor1, incrementSpanProcessor2)))
|
||||||
.build();
|
.build();
|
||||||
disruptorAsyncSpanProcessor.onStart(readableSpan);
|
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
|
||||||
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
||||||
disruptorAsyncSpanProcessor.shutdown();
|
disruptorAsyncSpanProcessor.shutdown();
|
||||||
assertThat(incrementSpanProcessor1.getCounterOnStart()).isEqualTo(1);
|
assertThat(incrementSpanProcessor1.getCounterOnStart()).isEqualTo(1);
|
||||||
|
|
@ -256,7 +258,7 @@ class DisruptorAsyncSpanProcessorTest {
|
||||||
DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor =
|
DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor =
|
||||||
DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build();
|
DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build();
|
||||||
for (int i = 1; i <= tenK; i++) {
|
for (int i = 1; i <= tenK; i++) {
|
||||||
disruptorAsyncSpanProcessor.onStart(readableSpan);
|
disruptorAsyncSpanProcessor.onStart(readWriteSpan);
|
||||||
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
disruptorAsyncSpanProcessor.onEnd(readableSpan);
|
||||||
if (i % 100 == 0) {
|
if (i % 100 == 0) {
|
||||||
disruptorAsyncSpanProcessor.forceFlush();
|
disruptorAsyncSpanProcessor.forceFlush();
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@
|
||||||
package io.opentelemetry.sdk.extensions.zpages;
|
package io.opentelemetry.sdk.extensions.zpages;
|
||||||
|
|
||||||
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
import io.opentelemetry.sdk.common.export.ConfigBuilder;
|
||||||
|
import io.opentelemetry.sdk.trace.ReadWriteSpan;
|
||||||
import io.opentelemetry.sdk.trace.ReadableSpan;
|
import io.opentelemetry.sdk.trace.ReadableSpan;
|
||||||
import io.opentelemetry.sdk.trace.SpanProcessor;
|
import io.opentelemetry.sdk.trace.SpanProcessor;
|
||||||
import io.opentelemetry.trace.SpanId;
|
import io.opentelemetry.trace.SpanId;
|
||||||
|
|
@ -64,7 +65,7 @@ final class TracezSpanProcessor implements SpanProcessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onStart(ReadableSpan span) {
|
public void onStart(ReadWriteSpan span) {
|
||||||
runningSpanCache.put(span.getSpanContext().getSpanId(), span);
|
runningSpanCache.put(span.getSpanContext().getSpanId(), span);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ package io.opentelemetry.sdk.extensions.zpages;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import io.opentelemetry.sdk.trace.ReadWriteSpan;
|
||||||
import io.opentelemetry.sdk.trace.ReadableSpan;
|
import io.opentelemetry.sdk.trace.ReadableSpan;
|
||||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||||
import io.opentelemetry.trace.SpanContext;
|
import io.opentelemetry.trace.SpanContext;
|
||||||
|
|
@ -59,14 +60,15 @@ class TracezSpanProcessorTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Mock private ReadableSpan readableSpan;
|
@Mock private ReadableSpan readableSpan;
|
||||||
|
@Mock private ReadWriteSpan readWriteSpan;
|
||||||
@Mock private SpanData spanData;
|
@Mock private SpanData spanData;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void onStart_sampledSpan_inCache() {
|
void onStart_sampledSpan_inCache() {
|
||||||
TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build();
|
TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build();
|
||||||
/* Return a sampled span, which should be added to the running cache by default */
|
/* Return a sampled span, which should be added to the running cache by default */
|
||||||
when(readableSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT);
|
when(readWriteSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT);
|
||||||
spanProcessor.onStart(readableSpan);
|
spanProcessor.onStart(readWriteSpan);
|
||||||
assertSpanCacheSizes(spanProcessor, 1, 0);
|
assertSpanCacheSizes(spanProcessor, 1, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -74,9 +76,12 @@ class TracezSpanProcessorTest {
|
||||||
void onEnd_sampledSpan_inCache() {
|
void onEnd_sampledSpan_inCache() {
|
||||||
TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build();
|
TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build();
|
||||||
/* Return a sampled span, which should be added to the completed cache upon ending */
|
/* Return a sampled span, which should be added to the completed cache upon ending */
|
||||||
|
when(readWriteSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT);
|
||||||
|
when(readWriteSpan.getName()).thenReturn(SPAN_NAME);
|
||||||
|
spanProcessor.onStart(readWriteSpan);
|
||||||
|
|
||||||
when(readableSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT);
|
when(readableSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT);
|
||||||
when(readableSpan.getName()).thenReturn(SPAN_NAME);
|
when(readableSpan.getName()).thenReturn(SPAN_NAME);
|
||||||
spanProcessor.onStart(readableSpan);
|
|
||||||
when(readableSpan.toSpanData()).thenReturn(spanData);
|
when(readableSpan.toSpanData()).thenReturn(spanData);
|
||||||
when(spanData.getStatus()).thenReturn(SPAN_STATUS);
|
when(spanData.getStatus()).thenReturn(SPAN_STATUS);
|
||||||
spanProcessor.onEnd(readableSpan);
|
spanProcessor.onEnd(readableSpan);
|
||||||
|
|
@ -87,8 +92,8 @@ class TracezSpanProcessorTest {
|
||||||
void onStart_notSampledSpan_inCache() {
|
void onStart_notSampledSpan_inCache() {
|
||||||
TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build();
|
TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build();
|
||||||
/* Return a non-sampled span, which should not be added to the running cache by default */
|
/* Return a non-sampled span, which should not be added to the running cache by default */
|
||||||
when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
when(readWriteSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
||||||
spanProcessor.onStart(readableSpan);
|
spanProcessor.onStart(readWriteSpan);
|
||||||
assertSpanCacheSizes(spanProcessor, 1, 0);
|
assertSpanCacheSizes(spanProcessor, 1, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -96,8 +101,9 @@ class TracezSpanProcessorTest {
|
||||||
void onEnd_notSampledSpan_notInCache() {
|
void onEnd_notSampledSpan_notInCache() {
|
||||||
TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build();
|
TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build();
|
||||||
/* Return a non-sampled span, which should not be added to the running cache by default */
|
/* Return a non-sampled span, which should not be added to the running cache by default */
|
||||||
|
when(readWriteSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
||||||
when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
||||||
spanProcessor.onStart(readableSpan);
|
spanProcessor.onStart(readWriteSpan);
|
||||||
spanProcessor.onEnd(readableSpan);
|
spanProcessor.onEnd(readableSpan);
|
||||||
assertSpanCacheSizes(spanProcessor, 0, 0);
|
assertSpanCacheSizes(spanProcessor, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
@ -111,9 +117,10 @@ class TracezSpanProcessorTest {
|
||||||
TracezSpanProcessor.newBuilder().readProperties(properties).build();
|
TracezSpanProcessor.newBuilder().readProperties(properties).build();
|
||||||
|
|
||||||
/* Return a non-sampled span, which should not be added to the completed cache */
|
/* Return a non-sampled span, which should not be added to the completed cache */
|
||||||
when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
when(readWriteSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
||||||
spanProcessor.onStart(readableSpan);
|
spanProcessor.onStart(readWriteSpan);
|
||||||
assertSpanCacheSizes(spanProcessor, 1, 0);
|
assertSpanCacheSizes(spanProcessor, 1, 0);
|
||||||
|
when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
||||||
spanProcessor.onEnd(readableSpan);
|
spanProcessor.onEnd(readableSpan);
|
||||||
assertSpanCacheSizes(spanProcessor, 0, 0);
|
assertSpanCacheSizes(spanProcessor, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
@ -127,10 +134,13 @@ class TracezSpanProcessorTest {
|
||||||
TracezSpanProcessor.newBuilder().readProperties(properties).build();
|
TracezSpanProcessor.newBuilder().readProperties(properties).build();
|
||||||
|
|
||||||
/* Return a non-sampled span, which should be added to the caches */
|
/* Return a non-sampled span, which should be added to the caches */
|
||||||
when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
when(readWriteSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
||||||
when(readableSpan.getName()).thenReturn(SPAN_NAME);
|
spanProcessor.onStart(readWriteSpan);
|
||||||
spanProcessor.onStart(readableSpan);
|
|
||||||
assertSpanCacheSizes(spanProcessor, 1, 0);
|
assertSpanCacheSizes(spanProcessor, 1, 0);
|
||||||
|
|
||||||
|
when(readableSpan.getName()).thenReturn(SPAN_NAME);
|
||||||
|
when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT);
|
||||||
when(readableSpan.toSpanData()).thenReturn(spanData);
|
when(readableSpan.toSpanData()).thenReturn(spanData);
|
||||||
when(spanData.getStatus()).thenReturn(SPAN_STATUS);
|
when(spanData.getStatus()).thenReturn(SPAN_STATUS);
|
||||||
spanProcessor.onEnd(readableSpan);
|
spanProcessor.onEnd(readableSpan);
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue