Co-authored-by: Jack Berg <jberg@newrelic.com>
This commit is contained in:
parent
2de5a2c484
commit
31f484f39f
|
@ -11,6 +11,8 @@ import static java.util.Objects.requireNonNull;
|
|||
import io.opentelemetry.api.metrics.MeterProvider;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/**
|
||||
* Builder class for {@link BatchLogRecordProcessor}.
|
||||
|
@ -18,6 +20,8 @@ import java.util.concurrent.TimeUnit;
|
|||
* @since 1.27.0
|
||||
*/
|
||||
public final class BatchLogRecordProcessorBuilder {
|
||||
private static final Logger logger =
|
||||
Logger.getLogger(BatchLogRecordProcessorBuilder.class.getName());
|
||||
|
||||
// Visible for testing
|
||||
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 1000;
|
||||
|
@ -103,6 +107,9 @@ public final class BatchLogRecordProcessorBuilder {
|
|||
*/
|
||||
public BatchLogRecordProcessorBuilder setMaxQueueSize(int maxQueueSize) {
|
||||
checkArgument(maxQueueSize > 0, "maxQueueSize must be positive.");
|
||||
if (maxExportBatchSize > maxQueueSize) {
|
||||
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
|
||||
}
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
return this;
|
||||
}
|
||||
|
@ -124,6 +131,9 @@ public final class BatchLogRecordProcessorBuilder {
|
|||
*/
|
||||
public BatchLogRecordProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
|
||||
checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive.");
|
||||
if (maxExportBatchSize > maxQueueSize) {
|
||||
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
|
||||
}
|
||||
this.maxExportBatchSize = maxExportBatchSize;
|
||||
return this;
|
||||
}
|
||||
|
@ -150,6 +160,10 @@ public final class BatchLogRecordProcessorBuilder {
|
|||
* @return a new {@link BatchLogRecordProcessor}.
|
||||
*/
|
||||
public BatchLogRecordProcessor build() {
|
||||
if (maxExportBatchSize > maxQueueSize) {
|
||||
maxExportBatchSize = maxQueueSize;
|
||||
logger.log(Level.FINE, "Using maxExportBatchSize: {0}", maxExportBatchSize);
|
||||
}
|
||||
return new BatchLogRecordProcessor(
|
||||
logRecordExporter,
|
||||
meterProvider,
|
||||
|
|
|
@ -10,10 +10,13 @@ import static org.assertj.core.api.Assertions.assertThat;
|
|||
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyList;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.opentelemetry.api.internal.GuardedBy;
|
||||
|
@ -21,6 +24,7 @@ import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
|
|||
import io.opentelemetry.sdk.common.CompletableResultCode;
|
||||
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
|
||||
import io.opentelemetry.sdk.logs.data.LogRecordData;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -117,6 +121,44 @@ class BatchLogRecordProcessorTest {
|
|||
.hasMessage("maxQueueSize must be positive.");
|
||||
}
|
||||
|
||||
@Test
|
||||
void builderAdjustMaxBatchSize() {
|
||||
LogRecordExporter dummyExporter = new CompletableLogRecordExporter();
|
||||
|
||||
BatchLogRecordProcessorBuilder builder =
|
||||
BatchLogRecordProcessor.builder(dummyExporter)
|
||||
.setMaxQueueSize(513)
|
||||
.setMaxExportBatchSize(1000);
|
||||
builder.build();
|
||||
|
||||
assertThat(builder.getMaxExportBatchSize()).isEqualTo(513);
|
||||
assertThat(builder.getMaxQueueSize()).isEqualTo(513);
|
||||
}
|
||||
|
||||
@Test
|
||||
void maxExportBatchSizeExceedsQueueSize() throws InterruptedException {
|
||||
// Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n =
|
||||
// maxQueueSize logs are emitted, export is triggered and that the queue is fully drained and
|
||||
// exported.
|
||||
int maxQueueSize = 2048;
|
||||
when(mockLogRecordExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess());
|
||||
SdkLoggerProvider sdkLoggerProvider =
|
||||
SdkLoggerProvider.builder()
|
||||
.addLogRecordProcessor(
|
||||
BatchLogRecordProcessor.builder(mockLogRecordExporter)
|
||||
.setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE))
|
||||
.setMaxExportBatchSize(2049)
|
||||
.setMaxQueueSize(maxQueueSize)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
for (int i = 0; i < maxQueueSize; i++) {
|
||||
emitLog(sdkLoggerProvider, "log " + i);
|
||||
}
|
||||
|
||||
await().untilAsserted(() -> verify(mockLogRecordExporter, times(1)).export(any()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void emitMultipleLogs() {
|
||||
WaitingLogRecordExporter waitingLogRecordExporter =
|
||||
|
|
|
@ -11,9 +11,12 @@ import static java.util.Objects.requireNonNull;
|
|||
import io.opentelemetry.api.metrics.MeterProvider;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
/** Builder class for {@link BatchSpanProcessor}. */
|
||||
public final class BatchSpanProcessorBuilder {
|
||||
private static final Logger logger = Logger.getLogger(BatchSpanProcessorBuilder.class.getName());
|
||||
|
||||
// Visible for testing
|
||||
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 5000;
|
||||
|
@ -111,6 +114,9 @@ public final class BatchSpanProcessorBuilder {
|
|||
*/
|
||||
public BatchSpanProcessorBuilder setMaxQueueSize(int maxQueueSize) {
|
||||
checkArgument(maxQueueSize > 0, "maxQueueSize must be positive.");
|
||||
if (maxExportBatchSize > maxQueueSize) {
|
||||
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
|
||||
}
|
||||
this.maxQueueSize = maxQueueSize;
|
||||
return this;
|
||||
}
|
||||
|
@ -132,6 +138,9 @@ public final class BatchSpanProcessorBuilder {
|
|||
*/
|
||||
public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
|
||||
checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive.");
|
||||
if (maxExportBatchSize > maxQueueSize) {
|
||||
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
|
||||
}
|
||||
this.maxExportBatchSize = maxExportBatchSize;
|
||||
return this;
|
||||
}
|
||||
|
@ -158,6 +167,10 @@ public final class BatchSpanProcessorBuilder {
|
|||
* @return a new {@link BatchSpanProcessor}.
|
||||
*/
|
||||
public BatchSpanProcessor build() {
|
||||
if (maxExportBatchSize > maxQueueSize) {
|
||||
maxExportBatchSize = maxQueueSize;
|
||||
logger.log(Level.FINE, "Using maxExportBatchSize: {0}", maxExportBatchSize);
|
||||
}
|
||||
return new BatchSpanProcessor(
|
||||
spanExporter,
|
||||
exportUnsampledSpans,
|
||||
|
|
|
@ -14,6 +14,8 @@ import static org.mockito.ArgumentMatchers.anyList;
|
|||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.opentelemetry.api.internal.GuardedBy;
|
||||
|
@ -26,6 +28,7 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider;
|
|||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import io.opentelemetry.sdk.trace.samplers.Sampler;
|
||||
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -126,6 +129,44 @@ class BatchSpanProcessorTest {
|
|||
.hasMessage("maxQueueSize must be positive.");
|
||||
}
|
||||
|
||||
@Test
|
||||
void builderAdjustMaxBatchSize() {
|
||||
SpanExporter dummyExporter = new CompletableSpanExporter();
|
||||
|
||||
BatchSpanProcessorBuilder builder =
|
||||
BatchSpanProcessor.builder(dummyExporter).setMaxQueueSize(513).setMaxExportBatchSize(1000);
|
||||
builder.build();
|
||||
|
||||
assertThat(builder.getMaxExportBatchSize()).isEqualTo(513);
|
||||
assertThat(builder.getMaxQueueSize()).isEqualTo(513);
|
||||
}
|
||||
|
||||
@Test
|
||||
void maxExportBatchSizeExceedsQueueSize() throws InterruptedException {
|
||||
// Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n =
|
||||
// maxQueueSize spans are ended, export is triggered and that the queue is fully drained and
|
||||
// exported.
|
||||
int maxQueueSize = 2048;
|
||||
when(mockSpanExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess());
|
||||
sdkTracerProvider =
|
||||
SdkTracerProvider.builder()
|
||||
.addSpanProcessor(
|
||||
BatchSpanProcessor.builder(mockSpanExporter)
|
||||
.setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE))
|
||||
.setMaxExportBatchSize(2049)
|
||||
.setMaxQueueSize(maxQueueSize)
|
||||
.build())
|
||||
.build();
|
||||
|
||||
for (int i = 0; i < maxQueueSize; i++) {
|
||||
createEndedSpan("span " + i);
|
||||
}
|
||||
|
||||
Thread.sleep(10);
|
||||
|
||||
await().untilAsserted(() -> verify(mockSpanExporter, times(1)).export(any()));
|
||||
}
|
||||
|
||||
@Test
|
||||
void startEndRequirements() {
|
||||
BatchSpanProcessor spansProcessor =
|
||||
|
|
Loading…
Reference in New Issue