diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java index 40e53302e6..e809acb951 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorBuilder.java @@ -11,6 +11,8 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.metrics.MeterProvider; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Builder class for {@link BatchLogRecordProcessor}. @@ -18,6 +20,8 @@ import java.util.concurrent.TimeUnit; * @since 1.27.0 */ public final class BatchLogRecordProcessorBuilder { + private static final Logger logger = + Logger.getLogger(BatchLogRecordProcessorBuilder.class.getName()); // Visible for testing static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 1000; @@ -103,6 +107,9 @@ public final class BatchLogRecordProcessorBuilder { */ public BatchLogRecordProcessorBuilder setMaxQueueSize(int maxQueueSize) { checkArgument(maxQueueSize > 0, "maxQueueSize must be positive."); + if (maxExportBatchSize > maxQueueSize) { + logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize."); + } this.maxQueueSize = maxQueueSize; return this; } @@ -124,6 +131,9 @@ public final class BatchLogRecordProcessorBuilder { */ public BatchLogRecordProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) { checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive."); + if (maxExportBatchSize > maxQueueSize) { + logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize."); + } this.maxExportBatchSize = maxExportBatchSize; return this; } @@ -150,6 +160,10 @@ public final class BatchLogRecordProcessorBuilder { * @return a new {@link BatchLogRecordProcessor}. */ public BatchLogRecordProcessor build() { + if (maxExportBatchSize > maxQueueSize) { + maxExportBatchSize = maxQueueSize; + logger.log(Level.FINE, "Using maxExportBatchSize: {0}", maxExportBatchSize); + } return new BatchLogRecordProcessor( logRecordExporter, meterProvider, diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java index e051eaf175..4f388a5ca4 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java @@ -10,10 +10,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.opentelemetry.api.internal.GuardedBy; @@ -21,6 +24,7 @@ import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.SdkLoggerProvider; import io.opentelemetry.sdk.logs.data.LogRecordData; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -117,6 +121,44 @@ class BatchLogRecordProcessorTest { .hasMessage("maxQueueSize must be positive."); } + @Test + void builderAdjustMaxBatchSize() { + LogRecordExporter dummyExporter = new CompletableLogRecordExporter(); + + BatchLogRecordProcessorBuilder builder = + BatchLogRecordProcessor.builder(dummyExporter) + .setMaxQueueSize(513) + .setMaxExportBatchSize(1000); + builder.build(); + + assertThat(builder.getMaxExportBatchSize()).isEqualTo(513); + assertThat(builder.getMaxQueueSize()).isEqualTo(513); + } + + @Test + void maxExportBatchSizeExceedsQueueSize() throws InterruptedException { + // Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n = + // maxQueueSize logs are emitted, export is triggered and that the queue is fully drained and + // exported. + int maxQueueSize = 2048; + when(mockLogRecordExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess()); + SdkLoggerProvider sdkLoggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + BatchLogRecordProcessor.builder(mockLogRecordExporter) + .setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE)) + .setMaxExportBatchSize(2049) + .setMaxQueueSize(maxQueueSize) + .build()) + .build(); + + for (int i = 0; i < maxQueueSize; i++) { + emitLog(sdkLoggerProvider, "log " + i); + } + + await().untilAsserted(() -> verify(mockLogRecordExporter, times(1)).export(any())); + } + @Test void emitMultipleLogs() { WaitingLogRecordExporter waitingLogRecordExporter = diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java index 8e9fe970d5..f968e35594 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorBuilder.java @@ -11,9 +11,12 @@ import static java.util.Objects.requireNonNull; import io.opentelemetry.api.metrics.MeterProvider; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; /** Builder class for {@link BatchSpanProcessor}. */ public final class BatchSpanProcessorBuilder { + private static final Logger logger = Logger.getLogger(BatchSpanProcessorBuilder.class.getName()); // Visible for testing static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 5000; @@ -111,6 +114,9 @@ public final class BatchSpanProcessorBuilder { */ public BatchSpanProcessorBuilder setMaxQueueSize(int maxQueueSize) { checkArgument(maxQueueSize > 0, "maxQueueSize must be positive."); + if (maxExportBatchSize > maxQueueSize) { + logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize."); + } this.maxQueueSize = maxQueueSize; return this; } @@ -132,6 +138,9 @@ public final class BatchSpanProcessorBuilder { */ public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) { checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive."); + if (maxExportBatchSize > maxQueueSize) { + logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize."); + } this.maxExportBatchSize = maxExportBatchSize; return this; } @@ -158,6 +167,10 @@ public final class BatchSpanProcessorBuilder { * @return a new {@link BatchSpanProcessor}. */ public BatchSpanProcessor build() { + if (maxExportBatchSize > maxQueueSize) { + maxExportBatchSize = maxQueueSize; + logger.log(Level.FINE, "Using maxExportBatchSize: {0}", maxExportBatchSize); + } return new BatchSpanProcessor( spanExporter, exportUnsampledSpans, diff --git a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java index 23b8874790..89aff56d9c 100644 --- a/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java +++ b/sdk/trace/src/test/java/io/opentelemetry/sdk/trace/export/BatchSpanProcessorTest.java @@ -14,6 +14,8 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.opentelemetry.api.internal.GuardedBy; @@ -26,6 +28,7 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -126,6 +129,44 @@ class BatchSpanProcessorTest { .hasMessage("maxQueueSize must be positive."); } + @Test + void builderAdjustMaxBatchSize() { + SpanExporter dummyExporter = new CompletableSpanExporter(); + + BatchSpanProcessorBuilder builder = + BatchSpanProcessor.builder(dummyExporter).setMaxQueueSize(513).setMaxExportBatchSize(1000); + builder.build(); + + assertThat(builder.getMaxExportBatchSize()).isEqualTo(513); + assertThat(builder.getMaxQueueSize()).isEqualTo(513); + } + + @Test + void maxExportBatchSizeExceedsQueueSize() throws InterruptedException { + // Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n = + // maxQueueSize spans are ended, export is triggered and that the queue is fully drained and + // exported. + int maxQueueSize = 2048; + when(mockSpanExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess()); + sdkTracerProvider = + SdkTracerProvider.builder() + .addSpanProcessor( + BatchSpanProcessor.builder(mockSpanExporter) + .setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE)) + .setMaxExportBatchSize(2049) + .setMaxQueueSize(maxQueueSize) + .build()) + .build(); + + for (int i = 0; i < maxQueueSize; i++) { + createEndedSpan("span " + i); + } + + Thread.sleep(10); + + await().untilAsserted(() -> verify(mockSpanExporter, times(1)).export(any())); + } + @Test void startEndRequirements() { BatchSpanProcessor spansProcessor =