Avoid linear queue.size() calls in span producers by storing queue size separately (#7141)

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Anton Rybochkin 2025-03-26 23:21:13 +01:00 committed by GitHub
parent 9698d24fdf
commit e6f90f58ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 19 additions and 13 deletions

View File

@ -64,21 +64,23 @@ public final class JcTools {
* @throws IllegalArgumentException if maxExportBatchSize is negative
*/
@SuppressWarnings("unchecked")
public static <T> void drain(Queue<T> queue, int limit, Consumer<T> consumer) {
public static <T> int drain(Queue<T> queue, int limit, Consumer<T> consumer) {
if (queue instanceof MessagePassingQueue) {
((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
return ((MessagePassingQueue<T>) queue).drain(consumer::accept, limit);
} else {
drainNonJcQueue(queue, limit, consumer);
return drainNonJcQueue(queue, limit, consumer);
}
}
private static <T> void drainNonJcQueue(
private static <T> int drainNonJcQueue(
Queue<T> queue, int maxExportBatchSize, Consumer<T> consumer) {
int polledCount = 0;
T item;
while (polledCount++ < maxExportBatchSize && (item = queue.poll()) != null) {
while (polledCount < maxExportBatchSize && (item = queue.poll()) != null) {
consumer.accept(item);
++polledCount;
}
return polledCount;
}
private JcTools() {}

View File

@ -173,6 +173,7 @@ public final class BatchSpanProcessor implements SpanProcessor {
private long nextExportTime;
private final Queue<ReadableSpan> queue;
private final AtomicInteger queueSize = new AtomicInteger();
// When waiting on the spans queue, exporter thread sets this atomic to the number of more
// spans it needs before doing an export. Writer threads would then wait for the queue to reach
// spansNeeded size before notifying the exporter thread about new entries.
@ -237,7 +238,7 @@ public final class BatchSpanProcessor implements SpanProcessor {
if (!queue.offer(span)) {
processedSpansCounter.add(1, droppedAttrs);
} else {
if (queue.size() >= spansNeeded.get()) {
if (queueSize.incrementAndGet() >= spansNeeded.get()) {
signal.offer(true);
}
}
@ -251,8 +252,7 @@ public final class BatchSpanProcessor implements SpanProcessor {
if (flushRequested.get() != null) {
flush();
}
JcTools.drain(
queue, maxExportBatchSize - batch.size(), span -> batch.add(span.toSpanData()));
drain(maxExportBatchSize - batch.size());
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
exportCurrentBatch();
@ -274,13 +274,17 @@ public final class BatchSpanProcessor implements SpanProcessor {
}
}
private int drain(int limit) {
int drained = JcTools.drain(queue, limit, span -> batch.add(span.toSpanData()));
queueSize.addAndGet(-drained);
return drained;
}
private void flush() {
int spansToFlush = queue.size();
int spansToFlush = queueSize.get();
while (spansToFlush > 0) {
ReadableSpan span = queue.poll();
assert span != null;
batch.add(span.toSpanData());
spansToFlush--;
int drained = drain(maxExportBatchSize - batch.size());
spansToFlush -= drained;
if (batch.size() >= maxExportBatchSize) {
exportCurrentBatch();
}