Add synchronization to SimpleLogRecordProcessor and SimpleSpanProcessor to ensure thread-safe export of logs and spans respectively (#6885)

This commit is contained in:
Saurabh Lodha 2024-11-18 14:54:40 -07:00 committed by GitHub
parent ec3c55ffeb
commit 2a97eaedc5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 16 additions and 2 deletions

View File

@ -41,6 +41,8 @@ public final class SimpleLogRecordProcessor implements LogRecordProcessor {
Collections.newSetFromMap(new ConcurrentHashMap<>()); Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final Object exporterLock = new Object();
/** /**
* Returns a new {@link SimpleLogRecordProcessor} which exports logs to the {@link * Returns a new {@link SimpleLogRecordProcessor} which exports logs to the {@link
* LogRecordExporter} synchronously. * LogRecordExporter} synchronously.
@ -64,7 +66,12 @@ public final class SimpleLogRecordProcessor implements LogRecordProcessor {
public void onEmit(Context context, ReadWriteLogRecord logRecord) { public void onEmit(Context context, ReadWriteLogRecord logRecord) {
try { try {
List<LogRecordData> logs = Collections.singletonList(logRecord.toLogRecordData()); List<LogRecordData> logs = Collections.singletonList(logRecord.toLogRecordData());
CompletableResultCode result = logRecordExporter.export(logs); CompletableResultCode result;
synchronized (exporterLock) {
result = logRecordExporter.export(logs);
}
pendingExports.add(result); pendingExports.add(result);
result.whenComplete( result.whenComplete(
() -> { () -> {

View File

@ -41,6 +41,8 @@ public final class SimpleSpanProcessor implements SpanProcessor {
Collections.newSetFromMap(new ConcurrentHashMap<>()); Collections.newSetFromMap(new ConcurrentHashMap<>());
private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final AtomicBoolean isShutdown = new AtomicBoolean(false);
private final Object exporterLock = new Object();
/** /**
* Returns a new {@link SimpleSpanProcessor} which exports spans to the {@link SpanExporter} * Returns a new {@link SimpleSpanProcessor} which exports spans to the {@link SpanExporter}
* synchronously. * synchronously.
@ -86,7 +88,12 @@ public final class SimpleSpanProcessor implements SpanProcessor {
if (span != null && (exportUnsampledSpans || span.getSpanContext().isSampled())) { if (span != null && (exportUnsampledSpans || span.getSpanContext().isSampled())) {
try { try {
List<SpanData> spans = Collections.singletonList(span.toSpanData()); List<SpanData> spans = Collections.singletonList(span.toSpanData());
CompletableResultCode result = spanExporter.export(spans); CompletableResultCode result;
synchronized (exporterLock) {
result = spanExporter.export(spans);
}
pendingExports.add(result); pendingExports.add(result);
result.whenComplete( result.whenComplete(
() -> { () -> {