From 2a97eaedc55ee4d5edc6f1e998af89d29ddb554d Mon Sep 17 00:00:00 2001 From: Saurabh Lodha Date: Mon, 18 Nov 2024 14:54:40 -0700 Subject: [PATCH] Add synchronization to SimpleLogRecordProcessor and SimpleSpanProcessor to ensure thread-safe export of logs and spans respectively (#6885) --- .../sdk/logs/export/SimpleLogRecordProcessor.java | 9 ++++++++- .../sdk/trace/export/SimpleSpanProcessor.java | 9 ++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java index 5302212345..cc75b50ae1 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java @@ -41,6 +41,8 @@ public final class SimpleLogRecordProcessor implements LogRecordProcessor { Collections.newSetFromMap(new ConcurrentHashMap<>()); private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final Object exporterLock = new Object(); + /** * Returns a new {@link SimpleLogRecordProcessor} which exports logs to the {@link * LogRecordExporter} synchronously. @@ -64,7 +66,12 @@ public final class SimpleLogRecordProcessor implements LogRecordProcessor { public void onEmit(Context context, ReadWriteLogRecord logRecord) { try { List logs = Collections.singletonList(logRecord.toLogRecordData()); - CompletableResultCode result = logRecordExporter.export(logs); + CompletableResultCode result; + + synchronized (exporterLock) { + result = logRecordExporter.export(logs); + } + pendingExports.add(result); result.whenComplete( () -> { diff --git a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java index 68b197a4b5..f543e25353 100644 --- a/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java +++ b/sdk/trace/src/main/java/io/opentelemetry/sdk/trace/export/SimpleSpanProcessor.java @@ -41,6 +41,8 @@ public final class SimpleSpanProcessor implements SpanProcessor { Collections.newSetFromMap(new ConcurrentHashMap<>()); private final AtomicBoolean isShutdown = new AtomicBoolean(false); + private final Object exporterLock = new Object(); + /** * Returns a new {@link SimpleSpanProcessor} which exports spans to the {@link SpanExporter} * synchronously. @@ -86,7 +88,12 @@ public final class SimpleSpanProcessor implements SpanProcessor { if (span != null && (exportUnsampledSpans || span.getSpanContext().isSampled())) { try { List spans = Collections.singletonList(span.toSpanData()); - CompletableResultCode result = spanExporter.export(spans); + CompletableResultCode result; + + synchronized (exporterLock) { + result = spanExporter.export(spans); + } + pendingExports.add(result); result.whenComplete( () -> {