From 7e87a69ae0889bfb3ad3bc828decd0ce9d0fa994 Mon Sep 17 00:00:00 2001 From: John Watson Date: Mon, 24 Aug 2020 07:48:27 -0700 Subject: [PATCH] Introduce a ReadWriteSpan interface and pass it to the onStart of the SpanProcessor (#1574) * start working on a ReadWriteSpan for startSpan * Finish converting the disruptor and the unit tests to the new model. * Add some simple javadoc to the ReadWriteSpan interface --- .../sdk/trace/MultiSpanProcessor.java | 2 +- .../sdk/trace/NoopSpanProcessor.java | 2 +- .../sdk/trace/ReadWriteSpan.java | 25 ++++++++++++ .../sdk/trace/RecordEventsReadableSpan.java | 2 +- .../sdk/trace/SpanProcessor.java | 2 +- .../sdk/trace/export/BatchSpanProcessor.java | 3 +- .../sdk/trace/export/SimpleSpanProcessor.java | 3 +- .../sdk/trace/MultiSpanProcessorTest.java | 21 +++++----- .../sdk/trace/NoopSpanProcessorTest.java | 3 +- .../sdk/trace/TracerSdkTest.java | 2 +- .../trace/export/SimpleSpanProcessorTest.java | 4 +- .../export/DisruptorAsyncSpanProcessor.java | 3 +- .../trace/export/DisruptorEventQueue.java | 40 +++++++++---------- .../DisruptorAsyncSpanProcessorTest.java | 18 +++++---- .../zpages/TracezSpanProcessor.java | 3 +- .../zpages/TracezSpanProcessorTest.java | 32 ++++++++++----- 16 files changed, 104 insertions(+), 61 deletions(-) create mode 100644 sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/ReadWriteSpan.java diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java index f4ca885af4..0194571b38 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/MultiSpanProcessor.java @@ -42,7 +42,7 @@ public final class MultiSpanProcessor implements SpanProcessor { } @Override - public void onStart(ReadableSpan readableSpan) { + public void onStart(ReadWriteSpan readableSpan) { for (SpanProcessor spanProcessor : spanProcessorsStart) { spanProcessor.onStart(readableSpan); } diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/NoopSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/NoopSpanProcessor.java index c5063df61c..3b4abc3861 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/NoopSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/NoopSpanProcessor.java @@ -24,7 +24,7 @@ final class NoopSpanProcessor implements SpanProcessor { } @Override - public void onStart(ReadableSpan span) {} + public void onStart(ReadWriteSpan span) {} @Override public boolean isStartRequired() { diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/ReadWriteSpan.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/ReadWriteSpan.java new file mode 100644 index 0000000000..b1abfaef54 --- /dev/null +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/ReadWriteSpan.java @@ -0,0 +1,25 @@ +/* + * Copyright 2020, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.opentelemetry.sdk.trace; + +import io.opentelemetry.trace.Span; + +/** + * A combination of the write methods from the {@link Span} interface and the read methods from the + * {@link ReadableSpan} interface. + */ +public interface ReadWriteSpan extends Span, ReadableSpan {} diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/RecordEventsReadableSpan.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/RecordEventsReadableSpan.java index 9413bee41f..af663fcf6d 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/RecordEventsReadableSpan.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/RecordEventsReadableSpan.java @@ -51,7 +51,7 @@ import javax.annotation.concurrent.ThreadSafe; /** Implementation for the {@link Span} class that records trace events. */ @ThreadSafe -final class RecordEventsReadableSpan implements ReadableSpan, Span { +final class RecordEventsReadableSpan implements ReadWriteSpan { private static final Logger logger = Logger.getLogger(Tracer.class.getName()); diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java index f49f3a979a..341f726d42 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/SpanProcessor.java @@ -32,7 +32,7 @@ public interface SpanProcessor { * * @param span the {@code ReadableSpan} that just started. */ - void onStart(ReadableSpan span); + void onStart(ReadWriteSpan span); /** * Returns {@code true} if this {@link SpanProcessor} requires start events. diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java index fda22afb2a..95883c177e 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessor.java @@ -26,6 +26,7 @@ import io.opentelemetry.metrics.Meter; import io.opentelemetry.sdk.common.DaemonThreadFactory; import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; +import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; @@ -110,7 +111,7 @@ public final class BatchSpanProcessor implements SpanProcessor { } @Override - public void onStart(ReadableSpan span) {} + public void onStart(ReadWriteSpan span) {} @Override public boolean isStartRequired() { diff --git a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java index 2ab0a28319..7db54fbaa5 100644 --- a/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java +++ b/sdk/tracing/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java @@ -19,6 +19,7 @@ package io.opentelemetry.sdk.trace.export; import com.google.common.annotations.VisibleForTesting; import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilder; +import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.sdk.trace.data.SpanData; @@ -64,7 +65,7 @@ public final class SimpleSpanProcessor implements SpanProcessor { } @Override - public void onStart(ReadableSpan span) { + public void onStart(ReadWriteSpan span) { // Do nothing. } diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java index bc9b2514de..1f1b53dc12 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/MultiSpanProcessorTest.java @@ -35,6 +35,7 @@ class MultiSpanProcessorTest { @Mock private SpanProcessor spanProcessor1; @Mock private SpanProcessor spanProcessor2; @Mock private ReadableSpan readableSpan; + @Mock private ReadWriteSpan readWriteSpan; @BeforeEach void setUp() { @@ -48,7 +49,7 @@ class MultiSpanProcessorTest { @Test void empty() { SpanProcessor multiSpanProcessor = MultiSpanProcessor.create(Collections.emptyList()); - multiSpanProcessor.onStart(readableSpan); + multiSpanProcessor.onStart(readWriteSpan); multiSpanProcessor.onEnd(readableSpan); multiSpanProcessor.shutdown(); } @@ -57,8 +58,8 @@ class MultiSpanProcessorTest { void oneSpanProcessor() { SpanProcessor multiSpanProcessor = MultiSpanProcessor.create(Collections.singletonList(spanProcessor1)); - multiSpanProcessor.onStart(readableSpan); - verify(spanProcessor1).onStart(same(readableSpan)); + multiSpanProcessor.onStart(readWriteSpan); + verify(spanProcessor1).onStart(same(readWriteSpan)); multiSpanProcessor.onEnd(readableSpan); verify(spanProcessor1).onEnd(same(readableSpan)); @@ -83,7 +84,7 @@ class MultiSpanProcessorTest { assertThat(multiSpanProcessor.isStartRequired()).isFalse(); assertThat(multiSpanProcessor.isEndRequired()).isFalse(); - multiSpanProcessor.onStart(readableSpan); + multiSpanProcessor.onStart(readWriteSpan); verifyNoMoreInteractions(spanProcessor1); multiSpanProcessor.onEnd(readableSpan); @@ -100,9 +101,9 @@ class MultiSpanProcessorTest { void twoSpanProcessor() { SpanProcessor multiSpanProcessor = MultiSpanProcessor.create(Arrays.asList(spanProcessor1, spanProcessor2)); - multiSpanProcessor.onStart(readableSpan); - verify(spanProcessor1).onStart(same(readableSpan)); - verify(spanProcessor2).onStart(same(readableSpan)); + multiSpanProcessor.onStart(readWriteSpan); + verify(spanProcessor1).onStart(same(readWriteSpan)); + verify(spanProcessor2).onStart(same(readWriteSpan)); multiSpanProcessor.onEnd(readableSpan); verify(spanProcessor1).onEnd(same(readableSpan)); @@ -127,9 +128,9 @@ class MultiSpanProcessorTest { assertThat(multiSpanProcessor.isStartRequired()).isTrue(); assertThat(multiSpanProcessor.isEndRequired()).isTrue(); - multiSpanProcessor.onStart(readableSpan); - verify(spanProcessor1).onStart(same(readableSpan)); - verify(spanProcessor2, times(0)).onStart(any(ReadableSpan.class)); + multiSpanProcessor.onStart(readWriteSpan); + verify(spanProcessor1).onStart(same(readWriteSpan)); + verify(spanProcessor2, times(0)).onStart(any(ReadWriteSpan.class)); multiSpanProcessor.onEnd(readableSpan); verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class)); diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/NoopSpanProcessorTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/NoopSpanProcessorTest.java index cf05800502..ba79030e6b 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/NoopSpanProcessorTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/NoopSpanProcessorTest.java @@ -26,6 +26,7 @@ import org.mockito.MockitoAnnotations; /** Unit tests for {@link NoopSpanProcessorTest}. */ class NoopSpanProcessorTest { @Mock private ReadableSpan readableSpan; + @Mock private ReadWriteSpan readWriteSpan; @BeforeEach void setUp() { @@ -35,7 +36,7 @@ class NoopSpanProcessorTest { @Test void noCrash() { SpanProcessor noopSpanProcessor = NoopSpanProcessor.getInstance(); - noopSpanProcessor.onStart(readableSpan); + noopSpanProcessor.onStart(readWriteSpan); assertThat(noopSpanProcessor.isStartRequired()).isFalse(); noopSpanProcessor.onEnd(readableSpan); assertThat(noopSpanProcessor.isEndRequired()).isFalse(); diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java index 70361ffd91..0841ebb7d4 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/TracerSdkTest.java @@ -160,7 +160,7 @@ class TracerSdkTest { private final AtomicLong numberOfSpansFinished = new AtomicLong(); @Override - public void onStart(ReadableSpan span) { + public void onStart(ReadWriteSpan span) { numberOfSpansStarted.incrementAndGet(); } diff --git a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java index 8c449e21c9..7c0e9da3c6 100644 --- a/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java +++ b/sdk/tracing/src/test/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessorTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import io.opentelemetry.sdk.common.export.CompletableResultCode; import io.opentelemetry.sdk.common.export.ConfigBuilderTest.ConfigTester; +import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.Samplers; import io.opentelemetry.sdk.trace.TestUtils; @@ -52,6 +53,7 @@ class SimpleSpanProcessorTest { private static final long MAX_SCHEDULE_DELAY_MILLIS = 500; private static final String SPAN_NAME = "MySpanName"; @Mock private ReadableSpan readableSpan; + @Mock private ReadWriteSpan readWriteSpan; @Mock private SpanExporter spanExporter; private final TracerSdkProvider tracerSdkFactory = TracerSdkProvider.builder().build(); private final Tracer tracer = tracerSdkFactory.get("SimpleSpanProcessor"); @@ -92,7 +94,7 @@ class SimpleSpanProcessorTest { @Test void onStartSync() { - simpleSampledSpansProcessor.onStart(readableSpan); + simpleSampledSpansProcessor.onStart(readWriteSpan); verifyNoInteractions(spanExporter); } diff --git a/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessor.java b/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessor.java index d4bfd9ab80..8496c93ec7 100644 --- a/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessor.java +++ b/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessor.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.lmax.disruptor.SleepingWaitStrategy; import com.lmax.disruptor.WaitStrategy; import io.opentelemetry.sdk.common.export.ConfigBuilder; +import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import java.util.Map; @@ -64,7 +65,7 @@ public final class DisruptorAsyncSpanProcessor implements SpanProcessor { // TODO: Add metrics for dropped spans. @Override - public void onStart(ReadableSpan span) { + public void onStart(ReadWriteSpan span) { if (!startRequired) { return; } diff --git a/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorEventQueue.java b/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorEventQueue.java index ce1e304b3d..93b814b69f 100644 --- a/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorEventQueue.java +++ b/sdk_extensions/async_processor/src/main/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorEventQueue.java @@ -24,6 +24,7 @@ import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import io.opentelemetry.sdk.common.DaemonThreadFactory; +import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import java.util.concurrent.CountDownLatch; @@ -41,18 +42,17 @@ import javax.annotation.concurrent.ThreadSafe; final class DisruptorEventQueue { private static final Logger logger = Logger.getLogger(DisruptorEventQueue.class.getName()); private static final String WORKER_THREAD_NAME = "DisruptorEventQueue_WorkerThread"; - private static final EventTranslatorThreeArg< - DisruptorEvent, EventType, ReadableSpan, CountDownLatch> + private static final EventTranslatorThreeArg TRANSLATOR_THREE_ARG = - new EventTranslatorThreeArg() { + new EventTranslatorThreeArg() { @Override public void translateTo( DisruptorEvent event, long sequence, - EventType arg0, - ReadableSpan arg1, - CountDownLatch arg2) { - event.setEntry(arg0, arg1, arg2); + EventType eventType, + Object span, + CountDownLatch countDownLatch) { + event.setEntry(eventType, span, countDownLatch); } }; private static final EventFactory EVENT_FACTORY = @@ -99,7 +99,7 @@ final class DisruptorEventQueue { this.blocking = blocking; } - void enqueueStartEvent(ReadableSpan span) { + void enqueueStartEvent(ReadWriteSpan span) { if (isShutdown) { if (!loggedShutdownMessage.getAndSet(true)) { logger.info("Attempted to enqueue start event after Disruptor shutdown."); @@ -156,33 +156,31 @@ final class DisruptorEventQueue { } // Enqueues an event on the {@link DisruptorEventQueue}. - private void enqueue(EventType eventType, ReadableSpan readableSpan, CountDownLatch flushLatch) { + private void enqueue(EventType eventType, Object span, CountDownLatch flushLatch) { if (blocking) { - ringBuffer.publishEvent(TRANSLATOR_THREE_ARG, eventType, readableSpan, flushLatch); + ringBuffer.publishEvent(TRANSLATOR_THREE_ARG, eventType, span, flushLatch); } else { // TODO: Record metrics if element not added. - ringBuffer.tryPublishEvent(TRANSLATOR_THREE_ARG, eventType, readableSpan, flushLatch); + ringBuffer.tryPublishEvent(TRANSLATOR_THREE_ARG, eventType, span, flushLatch); } } // An event in the {@link EventQueue}. Just holds a reference to an EventQueue.Entry. private static final class DisruptorEvent { - @Nullable private ReadableSpan readableSpan = null; + @Nullable private Object span = null; @Nullable private EventType eventType = null; @Nullable private CountDownLatch waitingCounter = null; void setEntry( - @Nullable EventType eventType, - @Nullable ReadableSpan readableSpan, - @Nullable CountDownLatch flushLatch) { - this.readableSpan = readableSpan; + @Nullable EventType eventType, @Nullable Object span, @Nullable CountDownLatch flushLatch) { + this.span = span; this.eventType = eventType; this.waitingCounter = flushLatch; } @Nullable - ReadableSpan getReadableSpan() { - return readableSpan; + Object getSpan() { + return span; } @Nullable @@ -206,7 +204,7 @@ final class DisruptorEventQueue { @Override public void onEvent(DisruptorEvent event, long sequence, boolean endOfBatch) { - final ReadableSpan readableSpan = event.getReadableSpan(); + final Object readableSpan = event.getSpan(); final EventType eventType = event.getEventType(); if (eventType == null) { logger.warning("Disruptor enqueued null element type."); @@ -215,10 +213,10 @@ final class DisruptorEventQueue { try { switch (eventType) { case ON_START: - spanProcessor.onStart(readableSpan); + spanProcessor.onStart((ReadWriteSpan) readableSpan); break; case ON_END: - spanProcessor.onEnd(readableSpan); + spanProcessor.onEnd((ReadableSpan) readableSpan); break; case ON_SHUTDOWN: spanProcessor.shutdown(); diff --git a/sdk_extensions/async_processor/src/test/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessorTest.java b/sdk_extensions/async_processor/src/test/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessorTest.java index 495b1843da..2234bf1a0f 100644 --- a/sdk_extensions/async_processor/src/test/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessorTest.java +++ b/sdk_extensions/async_processor/src/test/java/io/opentelemetry/sdk/extensions/trace/export/DisruptorAsyncSpanProcessorTest.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.sdk.common.export.ConfigBuilder; import io.opentelemetry.sdk.trace.MultiSpanProcessor; +import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import java.util.Arrays; @@ -38,6 +39,7 @@ class DisruptorAsyncSpanProcessorTest { private static final boolean NOT_REQUIRED = false; @Mock private ReadableSpan readableSpan; + @Mock private ReadWriteSpan readWriteSpan; @BeforeEach void setUp() { @@ -62,7 +64,7 @@ class DisruptorAsyncSpanProcessorTest { } @Override - public void onStart(ReadableSpan span) { + public void onStart(ReadWriteSpan span) { counterOnStart.incrementAndGet(); } @@ -128,7 +130,7 @@ class DisruptorAsyncSpanProcessorTest { assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isTrue(); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0); - disruptorAsyncSpanProcessor.onStart(readableSpan); + disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); disruptorAsyncSpanProcessor.forceFlush(); disruptorAsyncSpanProcessor.shutdown(); @@ -148,7 +150,7 @@ class DisruptorAsyncSpanProcessorTest { assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isTrue(); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0); - disruptorAsyncSpanProcessor.onStart(readableSpan); + disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); disruptorAsyncSpanProcessor.forceFlush(); disruptorAsyncSpanProcessor.shutdown(); @@ -168,7 +170,7 @@ class DisruptorAsyncSpanProcessorTest { assertThat(disruptorAsyncSpanProcessor.isEndRequired()).isFalse(); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0); assertThat(incrementSpanProcessor.getCounterOnEnd()).isEqualTo(0); - disruptorAsyncSpanProcessor.onStart(readableSpan); + disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); disruptorAsyncSpanProcessor.forceFlush(); disruptorAsyncSpanProcessor.shutdown(); @@ -197,7 +199,7 @@ class DisruptorAsyncSpanProcessorTest { DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor = DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build(); disruptorAsyncSpanProcessor.shutdown(); - disruptorAsyncSpanProcessor.onStart(readableSpan); + disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); disruptorAsyncSpanProcessor.forceFlush(); assertThat(incrementSpanProcessor.getCounterOnStart()).isEqualTo(0); @@ -214,7 +216,7 @@ class DisruptorAsyncSpanProcessorTest { DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor = DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build(); for (int i = 1; i <= tenK; i++) { - disruptorAsyncSpanProcessor.onStart(readableSpan); + disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); if (i % 10 == 0) { disruptorAsyncSpanProcessor.forceFlush(); @@ -236,7 +238,7 @@ class DisruptorAsyncSpanProcessorTest { MultiSpanProcessor.create( Arrays.asList(incrementSpanProcessor1, incrementSpanProcessor2))) .build(); - disruptorAsyncSpanProcessor.onStart(readableSpan); + disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); disruptorAsyncSpanProcessor.shutdown(); assertThat(incrementSpanProcessor1.getCounterOnStart()).isEqualTo(1); @@ -256,7 +258,7 @@ class DisruptorAsyncSpanProcessorTest { DisruptorAsyncSpanProcessor disruptorAsyncSpanProcessor = DisruptorAsyncSpanProcessor.newBuilder(incrementSpanProcessor).build(); for (int i = 1; i <= tenK; i++) { - disruptorAsyncSpanProcessor.onStart(readableSpan); + disruptorAsyncSpanProcessor.onStart(readWriteSpan); disruptorAsyncSpanProcessor.onEnd(readableSpan); if (i % 100 == 0) { disruptorAsyncSpanProcessor.forceFlush(); diff --git a/sdk_extensions/zpages/src/main/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessor.java b/sdk_extensions/zpages/src/main/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessor.java index 83325ef91b..c952255b00 100644 --- a/sdk_extensions/zpages/src/main/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessor.java +++ b/sdk_extensions/zpages/src/main/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessor.java @@ -17,6 +17,7 @@ package io.opentelemetry.sdk.extensions.zpages; import io.opentelemetry.sdk.common.export.ConfigBuilder; +import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.SpanProcessor; import io.opentelemetry.trace.SpanId; @@ -64,7 +65,7 @@ final class TracezSpanProcessor implements SpanProcessor { } @Override - public void onStart(ReadableSpan span) { + public void onStart(ReadWriteSpan span) { runningSpanCache.put(span.getSpanContext().getSpanId(), span); } diff --git a/sdk_extensions/zpages/src/test/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessorTest.java b/sdk_extensions/zpages/src/test/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessorTest.java index e32b555352..8efc0bb8b7 100644 --- a/sdk_extensions/zpages/src/test/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessorTest.java +++ b/sdk_extensions/zpages/src/test/java/io/opentelemetry/sdk/extensions/zpages/TracezSpanProcessorTest.java @@ -19,6 +19,7 @@ package io.opentelemetry.sdk.extensions.zpages; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; +import io.opentelemetry.sdk.trace.ReadWriteSpan; import io.opentelemetry.sdk.trace.ReadableSpan; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.trace.SpanContext; @@ -59,14 +60,15 @@ class TracezSpanProcessorTest { } @Mock private ReadableSpan readableSpan; + @Mock private ReadWriteSpan readWriteSpan; @Mock private SpanData spanData; @Test void onStart_sampledSpan_inCache() { TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build(); /* Return a sampled span, which should be added to the running cache by default */ - when(readableSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT); - spanProcessor.onStart(readableSpan); + when(readWriteSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT); + spanProcessor.onStart(readWriteSpan); assertSpanCacheSizes(spanProcessor, 1, 0); } @@ -74,9 +76,12 @@ class TracezSpanProcessorTest { void onEnd_sampledSpan_inCache() { TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build(); /* Return a sampled span, which should be added to the completed cache upon ending */ + when(readWriteSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT); + when(readWriteSpan.getName()).thenReturn(SPAN_NAME); + spanProcessor.onStart(readWriteSpan); + when(readableSpan.getSpanContext()).thenReturn(SAMPLED_SPAN_CONTEXT); when(readableSpan.getName()).thenReturn(SPAN_NAME); - spanProcessor.onStart(readableSpan); when(readableSpan.toSpanData()).thenReturn(spanData); when(spanData.getStatus()).thenReturn(SPAN_STATUS); spanProcessor.onEnd(readableSpan); @@ -87,8 +92,8 @@ class TracezSpanProcessorTest { void onStart_notSampledSpan_inCache() { TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build(); /* Return a non-sampled span, which should not be added to the running cache by default */ - when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); - spanProcessor.onStart(readableSpan); + when(readWriteSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); + spanProcessor.onStart(readWriteSpan); assertSpanCacheSizes(spanProcessor, 1, 0); } @@ -96,8 +101,9 @@ class TracezSpanProcessorTest { void onEnd_notSampledSpan_notInCache() { TracezSpanProcessor spanProcessor = TracezSpanProcessor.newBuilder().build(); /* Return a non-sampled span, which should not be added to the running cache by default */ + when(readWriteSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); - spanProcessor.onStart(readableSpan); + spanProcessor.onStart(readWriteSpan); spanProcessor.onEnd(readableSpan); assertSpanCacheSizes(spanProcessor, 0, 0); } @@ -111,9 +117,10 @@ class TracezSpanProcessorTest { TracezSpanProcessor.newBuilder().readProperties(properties).build(); /* Return a non-sampled span, which should not be added to the completed cache */ - when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); - spanProcessor.onStart(readableSpan); + when(readWriteSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); + spanProcessor.onStart(readWriteSpan); assertSpanCacheSizes(spanProcessor, 1, 0); + when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); spanProcessor.onEnd(readableSpan); assertSpanCacheSizes(spanProcessor, 0, 0); } @@ -127,10 +134,13 @@ class TracezSpanProcessorTest { TracezSpanProcessor.newBuilder().readProperties(properties).build(); /* Return a non-sampled span, which should be added to the caches */ - when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); - when(readableSpan.getName()).thenReturn(SPAN_NAME); - spanProcessor.onStart(readableSpan); + when(readWriteSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); + spanProcessor.onStart(readWriteSpan); + assertSpanCacheSizes(spanProcessor, 1, 0); + + when(readableSpan.getName()).thenReturn(SPAN_NAME); + when(readableSpan.getSpanContext()).thenReturn(NOT_SAMPLED_SPAN_CONTEXT); when(readableSpan.toSpanData()).thenReturn(spanData); when(spanData.getStatus()).thenReturn(SPAN_STATUS); spanProcessor.onEnd(readableSpan);