diff --git a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Constants.java b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Constants.java index f796e0b671..b1081c90f1 100644 --- a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Constants.java +++ b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/Constants.java @@ -39,6 +39,8 @@ public final class Constants { "org.msgpack", "com.fasterxml.jackson", "org.yaml.snakeyaml", + // disruptor + "com.lmax.disruptor", // okHttp "okhttp3", "okio", diff --git a/dd-trace-ot/dd-trace-ot.gradle b/dd-trace-ot/dd-trace-ot.gradle index 76f49be48c..c4ace1382b 100644 --- a/dd-trace-ot/dd-trace-ot.gradle +++ b/dd-trace-ot/dd-trace-ot.gradle @@ -37,6 +37,7 @@ dependencies { compile group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.16' compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0' // Last version to support Java7 compile group: 'com.github.jnr', name: 'jnr-unixsocket', version: '0.22' + compile group: 'com.lmax', name: 'disruptor', version: '3.4.2' // We have autoservices defined in test subtree, looks like we need this to be able to properly rebuild this testAnnotationProcessor deps.autoservice diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java index ef821856e3..856a738e5d 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java @@ -40,7 +40,6 @@ import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -89,8 +88,6 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace private final DatadogHttpCodec.Injector injector; private final DatadogHttpCodec.Extractor extractor; - private final AtomicInteger traceCount; - /** By default, report to local agent and collect all traces. */ public DDTracer() { this(Config.get()); @@ -240,12 +237,9 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace if (this.writer instanceof DDAgentWriter) { final DDApi api = ((DDAgentWriter) this.writer).getApi(); - traceCount = api.getTraceCounter(); if (sampler instanceof DDApi.ResponseListener) { api.addResponseListener((DDApi.ResponseListener) this.sampler); } - } else { - traceCount = new AtomicInteger(0); } registerClassLoader(ClassLoader.getSystemClassLoader()); @@ -385,7 +379,7 @@ public class DDTracer implements io.opentracing.Tracer, Closeable, datadog.trace /** Increment the reported trace count, but do not write a trace. */ void incrementTraceCount() { - traceCount.incrementAndGet(); + writer.incrementTraceCount(); } @Override diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/util/DaemonThreadFactory.java b/dd-trace-ot/src/main/java/datadog/trace/common/util/DaemonThreadFactory.java new file mode 100644 index 0000000000..25d08887ce --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/trace/common/util/DaemonThreadFactory.java @@ -0,0 +1,24 @@ +package datadog.trace.common.util; + +import java.util.concurrent.ThreadFactory; + +/** A {@link ThreadFactory} implementation that starts all {@link Thread} as daemons. */ +public final class DaemonThreadFactory implements ThreadFactory { + private final String threadName; + + /** + * Constructs a new {@code DaemonThreadFactory}. + * + * @param threadName used to prefix all thread names. + */ + public DaemonThreadFactory(final String threadName) { + this.threadName = threadName; + } + + @Override + public Thread newThread(final Runnable r) { + final Thread thread = new Thread(r, threadName); + thread.setDaemon(true); + return thread; + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index a8c6e070ec..5a02cf5c36 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -3,103 +3,154 @@ package datadog.trace.common.writer; import static datadog.trace.api.Config.DEFAULT_AGENT_HOST; import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET; import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT; +import static java.util.concurrent.TimeUnit.SECONDS; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.lmax.disruptor.EventFactory; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.EventTranslator; +import com.lmax.disruptor.EventTranslatorOneArg; +import com.lmax.disruptor.SleepingWaitStrategy; +import com.lmax.disruptor.dsl.Disruptor; +import com.lmax.disruptor.dsl.ProducerType; import datadog.opentracing.DDSpan; +import datadog.trace.common.util.DaemonThreadFactory; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; /** - * This writer write provided traces to the a DD agent which is most of time located on the same - * host. + * This writer buffers traces and sends them to the provided DDApi instance. * - *

- * - *

It handles writes asynchronuously so the calling threads are automatically released. However, - * if too much spans are collected the writers can reach a state where it is forced to drop incoming - * spans. + *

Written traces are passed off to a disruptor so as to avoid blocking the application's thread. + * If a flood of traces arrives that exceeds the disruptor ring size, the traces exceeding the + * threshold will be counted and sampled. */ @Slf4j public class DDAgentWriter implements Writer { + private static final int DISRUPTOR_BUFFER_SIZE = 8192; + private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB + private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second - /** Maximum number of traces kept in memory */ - static final int DEFAULT_MAX_TRACES = 7000; - - /** Flush interval for the API in seconds */ - static final long FLUSH_TIME_SECONDS = 1; - - private final ThreadFactory agentWriterThreadFactory = - new ThreadFactory() { + private static final EventTranslatorOneArg>, List> TRANSLATOR = + new EventTranslatorOneArg>, List>() { @Override - public Thread newThread(final Runnable r) { - final Thread thread = new Thread(r, "dd-agent-writer"); - thread.setDaemon(true); - return thread; + public void translateTo( + final Event> event, final long sequence, final List trace) { + event.data = trace; + } + }; + private static final EventTranslator>> FLUSH_TRANSLATOR = + new EventTranslator>>() { + @Override + public void translateTo(final Event> event, final long sequence) { + event.shouldFlush = true; } }; - /** Scheduled thread pool, acting like a cron */ - private final ScheduledExecutorService scheduledExecutor = - Executors.newScheduledThreadPool(1, agentWriterThreadFactory); + private static final ThreadFactory DISRUPTOR_THREAD_FACTORY = + new DaemonThreadFactory("dd-trace-disruptor"); + private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY = + new DaemonThreadFactory("dd-trace-writer"); - /** The DD agent api */ private final DDApi api; - - /** In memory collection of traces waiting for departure */ - private final WriterQueue> traces; - - private boolean queueFullReported = false; + private final int flushFrequencySeconds; + private final Disruptor>> disruptor; + private final ScheduledExecutorService scheduledWriterExecutor; + private final AtomicInteger traceCount = new AtomicInteger(0); + private final AtomicReference> flushSchedule = new AtomicReference<>(); + private final Phaser apiPhaser; + private volatile boolean running = false; public DDAgentWriter() { this(new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET)); } public DDAgentWriter(final DDApi api) { - this(api, new WriterQueue>(DEFAULT_MAX_TRACES)); + this(api, DISRUPTOR_BUFFER_SIZE, FLUSH_PAYLOAD_DELAY); } - public DDAgentWriter(final DDApi api, final WriterQueue> queue) { - super(); - this.api = api; - traces = queue; - } - - /* (non-Javadoc) - * @see datadog.trace.Writer#write(java.util.List) + /** + * Used in the tests. + * + * @param api + * @param disruptorSize Rounded up to next power of 2 + * @param flushFrequencySeconds value < 1 disables scheduled flushes */ + private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) { + this.api = api; + this.flushFrequencySeconds = flushFrequencySeconds; + disruptor = + new Disruptor<>( + new DisruptorEventFactory>(), + Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2 + DISRUPTOR_THREAD_FACTORY, + ProducerType.MULTI, + new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5))); + disruptor.handleEventsWith(new TraceConsumer()); + scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY); + apiPhaser = new Phaser(); // Ensure API calls are completed when flushing + apiPhaser.register(); // Register on behalf of the scheduled executor thread. + } + @Override public void write(final List trace) { - final List removed = traces.add(trace); - if (removed != null && !queueFullReported) { - log.debug("Queue is full, traces will be discarded, queue size: {}", DEFAULT_MAX_TRACES); - queueFullReported = true; - return; + // We can't add events after shutdown otherwise it will never complete shutting down. + if (running) { + final boolean published = disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace); + if (!published) { + // We're discarding the trace, but we still want to count it. + traceCount.incrementAndGet(); + log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace); + } + } else { + log.debug("Trace written after shutdown. Ignoring trace: {}", trace); } - queueFullReported = false; } - /* (non-Javadoc) - * @see Writer#start() - */ + @Override + public void incrementTraceCount() { + traceCount.incrementAndGet(); + } + + public DDApi getApi() { + return api; + } + @Override public void start() { - scheduledExecutor.scheduleAtFixedRate( - new TracesSendingTask(), 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS); + disruptor.start(); + running = true; + scheduleFlush(); } - /* (non-Javadoc) - * @see datadog.trace.Writer#close() - */ @Override public void close() { - scheduledExecutor.shutdownNow(); + running = false; + flush(); + disruptor.shutdown(); + scheduledWriterExecutor.shutdown(); + } + + /** This method will block until the flush is complete. */ + public void flush() { + log.info("Flushing any remaining traces."); + // Register with the phaser so we can block until the flush completion. + apiPhaser.register(); + disruptor.publishEvent(FLUSH_TRANSLATOR); try { - scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS); + // Allow thread to be interrupted. + apiPhaser.awaitAdvanceInterruptibly(apiPhaser.arriveAndDeregister()); } catch (final InterruptedException e) { - log.info("Writer properly closed and async writer interrupted."); + log.warn("Waiting for flush interrupted.", e); } } @@ -108,37 +159,109 @@ public class DDAgentWriter implements Writer { return "DDAgentWriter { api=" + api + " }"; } - public DDApi getApi() { - return api; - } - - class TracesSendingTask implements Runnable { - @Override - public void run() { - try { - if (traces.isEmpty()) { - return; - } - - final List> payload = traces.getAll(); - - if (log.isDebugEnabled()) { - int nbSpans = 0; - for (final List trace : payload) { - nbSpans += trace.size(); - } - log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); - } - - final boolean isSent = api.sendTraces(payload); - if (isSent) { - log.debug("Successfully sent {} traces to the API", payload.size()); - } else { - log.debug("Failed to send {} traces to the API", payload.size()); - } - } catch (final Throwable e) { - log.debug("Failed to send traces to the API: {}", e.getMessage()); + private void scheduleFlush() { + if (flushFrequencySeconds > 0) { + final ScheduledFuture previous = + flushSchedule.getAndSet( + scheduledWriterExecutor.schedule(flushTask, flushFrequencySeconds, SECONDS)); + if (previous != null) { + previous.cancel(true); } } } + + private final Runnable flushTask = new FlushTask(); + + private class FlushTask implements Runnable { + @Override + public void run() { + // Don't call flush() because it would block the thread also used for sending the traces. + disruptor.publishEvent(FLUSH_TRANSLATOR); + } + } + + /** This class is intentionally not threadsafe. */ + private class TraceConsumer implements EventHandler>> { + private List serializedTraces = new ArrayList<>(); + private int payloadSize = 0; + + @Override + public void onEvent( + final Event> event, final long sequence, final boolean endOfBatch) { + final List trace = event.data; + event.data = null; // clear the event for reuse. + if (trace != null) { + traceCount.incrementAndGet(); + try { + final byte[] serializedTrace = api.serializeTrace(trace); + payloadSize += serializedTrace.length; + serializedTraces.add(serializedTrace); + } catch (final JsonProcessingException e) { + log.warn("Error serializing trace", e); + } catch (final Throwable e) { + log.debug("Error while serializing trace", e); + } + } + if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) { + reportTraces(); + event.shouldFlush = false; + } + } + + private void reportTraces() { + try { + if (serializedTraces.isEmpty()) { + apiPhaser.arrive(); // Allow flush to return + return; + // scheduleFlush called in finally block. + } + final List toSend = serializedTraces; + serializedTraces = new ArrayList<>(toSend.size()); + // ^ Initialize with similar size to reduce arraycopy churn. + + final int representativeCount = traceCount.getAndSet(0); + final int sizeInBytes = payloadSize; + + // Run the actual IO task on a different thread to avoid blocking the consumer. + scheduledWriterExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + final boolean sent = + api.sendSerializedTraces(representativeCount, sizeInBytes, toSend); + if (sent) { + log.debug("Successfully sent {} traces to the API", toSend.size()); + } else { + log.debug( + "Failed to send {} traces (representing {}) of size {} bytes to the API", + toSend.size(), + representativeCount, + sizeInBytes); + } + } catch (final Throwable e) { + log.debug("Failed to send traces to the API: {}", e.getMessage()); + } finally { + apiPhaser.arrive(); // Flush completed. + } + } + }); + } finally { + payloadSize = 0; + scheduleFlush(); + } + } + } + + private static class Event { + private volatile boolean shouldFlush = false; + private volatile T data = null; + } + + private static class DisruptorEventFactory implements EventFactory> { + @Override + public Event newInstance() { + return new Event<>(); + } + } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java index 8d2bdbb509..b31c2b8a46 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java @@ -1,6 +1,7 @@ package datadog.trace.common.writer; import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import datadog.opentracing.DDSpan; @@ -8,11 +9,11 @@ import datadog.opentracing.DDTraceOTInfo; import datadog.trace.common.writer.unixdomainsockets.UnixDomainSocketFactory; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -20,6 +21,9 @@ import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; +import okio.BufferedSink; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessagePacker; import org.msgpack.jackson.dataformat.MessagePackFactory; /** The API pointing to a DD agent */ @@ -38,7 +42,6 @@ public class DDApi { private final List responseListeners = new ArrayList<>(); - private final AtomicInteger traceCount = new AtomicInteger(0); private volatile long nextAllowedLogTime = 0; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory()); @@ -76,10 +79,6 @@ public class DDApi { } } - public AtomicInteger getTraceCounter() { - return traceCount; - } - /** * Send traces to the DD agent * @@ -87,12 +86,63 @@ public class DDApi { * @return the staus code returned */ public boolean sendTraces(final List> traces) { - final int totalSize = traceCount.getAndSet(0); + final List serializedTraces = new ArrayList<>(traces.size()); + int sizeInBytes = 0; + for (final List trace : traces) { + try { + final byte[] serializedTrace = serializeTrace(trace); + sizeInBytes += serializedTrace.length; + serializedTraces.add(serializedTrace); + } catch (final JsonProcessingException e) { + log.warn("Error serializing trace", e); + } + } + + return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces); + } + + byte[] serializeTrace(final List trace) throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsBytes(trace); + } + + boolean sendSerializedTraces( + final int representativeCount, final Integer sizeInBytes, final List traces) { try { - final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(traces)); + final RequestBody body = + new RequestBody() { + @Override + public MediaType contentType() { + return MSGPACK; + } + + @Override + public long contentLength() { + final int traceCount = traces.size(); + // Need to allocate additional to handle MessagePacker.packArrayHeader + if (traceCount < (1 << 4)) { + return sizeInBytes + 1; // byte + } else if (traceCount < (1 << 16)) { + return sizeInBytes + 2; // short + } else { + return sizeInBytes + 4; // int + } + } + + @Override + public void writeTo(final BufferedSink sink) throws IOException { + final OutputStream out = sink.outputStream(); + final MessagePacker packer = MessagePack.newDefaultPacker(out); + packer.packArrayHeader(traces.size()); + for (final byte[] trace : traces) { + packer.writePayload(trace); + } + packer.close(); + out.close(); + } + }; final Request request = prepareRequest(tracesUrl) - .addHeader(X_DATADOG_TRACE_COUNT, String.valueOf(totalSize)) + .addHeader(X_DATADOG_TRACE_COUNT, String.valueOf(representativeCount)) .put(body) .build(); @@ -100,17 +150,18 @@ public class DDApi { if (response.code() != 200) { if (log.isDebugEnabled()) { log.debug( - "Error while sending {} of {} traces to the DD agent. Status: {}, ResponseMessage: ", + "Error while sending {} of {} traces to the DD agent. Status: {}, Response: {}, Body: {}", traces.size(), - totalSize, + representativeCount, response.code(), - response.message()); + response.message(), + response.body().string()); } else if (nextAllowedLogTime < System.currentTimeMillis()) { nextAllowedLogTime = System.currentTimeMillis() + MILLISECONDS_BETWEEN_ERROR_LOG; log.warn( "Error while sending {} of {} traces to the DD agent. Status: {} (going silent for {} seconds)", traces.size(), - totalSize, + representativeCount, response.code(), response.message(), TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG)); @@ -118,14 +169,18 @@ public class DDApi { return false; } - log.debug("Successfully sent {} of {} traces to the DD agent.", traces.size(), totalSize); + log.debug( + "Successfully sent {} of {} traces to the DD agent.", + traces.size(), + representativeCount); final String responseString = response.body().string().trim(); try { if (!"".equals(responseString) && !"OK".equalsIgnoreCase(responseString)) { final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString); + final String endpoint = tracesUrl.toString(); for (final ResponseListener listener : responseListeners) { - listener.onResponse(tracesUrl.toString(), parsedResponse); + listener.onResponse(endpoint, parsedResponse); } } } catch (final JsonParseException e) { @@ -139,7 +194,7 @@ public class DDApi { "Error while sending " + traces.size() + " of " - + totalSize + + representativeCount + " traces to the DD agent.", e); } else if (nextAllowedLogTime < System.currentTimeMillis()) { @@ -147,7 +202,7 @@ public class DDApi { log.warn( "Error while sending {} of {} traces to the DD agent. {}: {} (going silent for {} minutes)", traces.size(), - totalSize, + representativeCount, e.getClass().getName(), e.getMessage(), TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG)); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ListWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ListWriter.java index d60ac9bb66..a92343350d 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/ListWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/ListWriter.java @@ -44,6 +44,9 @@ public class ListWriter extends CopyOnWriteArrayList> implements Wr } } + @Override + public void incrementTraceCount() {} + @Override public void start() { close(); @@ -64,6 +67,6 @@ public class ListWriter extends CopyOnWriteArrayList> implements Wr @Override public String toString() { - return "ListWriter { size=" + this.size() + " }"; + return "ListWriter { size=" + size() + " }"; } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/LoggingWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/LoggingWriter.java index 0da4839d1d..cb95d07a72 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/LoggingWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/LoggingWriter.java @@ -20,6 +20,11 @@ public class LoggingWriter implements Writer { } } + @Override + public void incrementTraceCount() { + log.info("incrementTraceCount()"); + } + @Override public void close() { log.info("close()"); diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java index 324e289da5..306074ae3e 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/Writer.java @@ -2,12 +2,13 @@ package datadog.trace.common.writer; import datadog.opentracing.DDSpan; import datadog.trace.api.Config; +import java.io.Closeable; import java.util.List; import java.util.Properties; import lombok.extern.slf4j.Slf4j; /** A writer is responsible to send collected spans to some place */ -public interface Writer { +public interface Writer extends Closeable { /** * Write a trace represented by the entire list of all the finished spans @@ -23,8 +24,12 @@ public interface Writer { * Indicates to the writer that no future writing will come and it should terminates all * connections and tasks */ + @Override void close(); + /** Count that a trace was captured for stats, but without reporting it. */ + void incrementTraceCount(); + @Slf4j final class Builder { diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/WriterQueue.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/WriterQueue.java deleted file mode 100644 index 1ead96a243..0000000000 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/WriterQueue.java +++ /dev/null @@ -1,95 +0,0 @@ -package datadog.trace.common.writer; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -/** - * A bounded queue implementation compatible with the Datadog agent behavior. The class is - * thread-safe and can be used with concurrency. - * - *

- * - *

- * - *

- * - *

This class implements a specific behavior when it's full. Each new item added will replace an - * exisiting one, at a random place/index. The class is backed by an ArrayList in order to perform - * efficient random remove. - * - * @param The element type to store - */ -class WriterQueue { - - private final int capacity; - private volatile ArrayList list; - - /** - * Default construct, a capacity must be provided - * - * @param capacity the max size of the queue - */ - WriterQueue(final int capacity) { - if (capacity < 1) { - throw new IllegalArgumentException("Capacity couldn't be 0"); - } - this.list = emptyList(capacity); - this.capacity = capacity; - } - - /** - * Return a list containing all elements present in the queue. After the operation, the queue is - * reset. All action performed on the returned list has no impact to the queue - * - * @return a list contain all elements - */ - public synchronized List getAll() { - final List all = list; - list = emptyList(capacity); - return all; - } - - /** - * Add an element to the queue. If the queue is full, set the element at a random place in the - * queue and return the previous one. - * - * @param element the element to add to the queue - * @return null if the queue is not full, otherwise the removed element - */ - public synchronized T add(final T element) { - - T removed = null; - if (list.size() < capacity) { - list.add(element); - } else { - final int index = ThreadLocalRandom.current().nextInt(0, list.size()); - removed = list.set(index, element); - } - return removed; - } - - // Methods below are essentially used for testing purposes - - /** - * Return the number of elements set in the queue - * - * @return the current size of the queue - */ - public int size() { - return list.size(); - } - - /** - * Return true if the queue is empty - * - * @return true if the queue is empty - */ - public boolean isEmpty() { - return list.isEmpty(); - } - - private ArrayList emptyList(final int capacity) { - return new ArrayList<>(capacity); - } -} diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/PendingTraceTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/PendingTraceTest.groovy index 92ced670b8..0f61eb90f3 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/PendingTraceTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/PendingTraceTest.groovy @@ -1,6 +1,5 @@ package datadog.opentracing - import datadog.trace.api.Config import datadog.trace.common.writer.ListWriter import datadog.trace.util.gc.GCUtils @@ -10,13 +9,19 @@ import spock.lang.Timeout import java.lang.ref.WeakReference import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger import static datadog.trace.api.Config.PARTIAL_FLUSH_MIN_SPANS class PendingTraceTest extends Specification { - def writer = new ListWriter() + def traceCount = new AtomicInteger() + def writer = new ListWriter() { + @Override + void incrementTraceCount() { + PendingTraceTest.this.traceCount.incrementAndGet() + } + } def tracer = new DDTracer(writer) - def traceCount = tracer.traceCount def traceId = System.identityHashCode(this) String traceIdStr = String.valueOf(traceId) @@ -207,7 +212,7 @@ class PendingTraceTest extends Specification { trace.weakReferences.size() == 2 trace.asList() == [rootSpan] writer == [] - tracer.traceCount.get() == 0 + traceCount.get() == 0 when: child1.finish() @@ -217,7 +222,7 @@ class PendingTraceTest extends Specification { trace.weakReferences.size() == 1 trace.asList() == [rootSpan] writer == [[child1]] - tracer.traceCount.get() == 1 + traceCount.get() == 1 when: child2.finish() @@ -227,7 +232,7 @@ class PendingTraceTest extends Specification { trace.weakReferences.size() == 0 trace.asList() == [child2, rootSpan] writer == [[child1], [child2, rootSpan]] - tracer.traceCount.get() == 2 + traceCount.get() == 2 } def "partial flush with root span closed last"() { @@ -253,7 +258,7 @@ class PendingTraceTest extends Specification { trace.weakReferences.size() == 2 trace.asList() == [child1] writer == [] - tracer.traceCount.get() == 0 + traceCount.get() == 0 when: child2.finish() @@ -263,7 +268,7 @@ class PendingTraceTest extends Specification { trace.weakReferences.size() == 1 trace.asList() == [] writer == [[child2, child1]] - tracer.traceCount.get() == 1 + traceCount.get() == 1 when: rootSpan.finish() @@ -273,6 +278,6 @@ class PendingTraceTest extends Specification { trace.weakReferences.size() == 0 trace.asList() == [rootSpan] writer == [[child2, child1], [rootSpan]] - tracer.traceCount.get() == 2 + traceCount.get() == 2 } } diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/SpanFactory.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/SpanFactory.groovy index f57e6dad93..c35d5599a8 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/SpanFactory.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/SpanFactory.groovy @@ -4,9 +4,11 @@ import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.ListWriter class SpanFactory { - static newSpanOf(long timestampMicro) { + static newSpanOf(long timestampMicro, String threadName = Thread.currentThread().name) { def writer = new ListWriter() def tracer = new DDTracer(writer) + def currentThreadName = Thread.currentThread().getName() + Thread.currentThread().setName(threadName) def context = new DDSpanContext( "1", "1", @@ -22,6 +24,7 @@ class SpanFactory { Collections.emptyMap(), new PendingTrace(tracer, "1", [:]), tracer) + Thread.currentThread().setName(currentThreadName) return new DDSpan(timestampMicro, context) } diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy index 92a5501887..4ff52ea087 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -14,17 +14,32 @@ import spock.lang.Subject import spock.lang.Timeout import java.lang.ref.WeakReference -import java.util.concurrent.TimeUnit +import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference +import static java.util.concurrent.TimeUnit.SECONDS + class ScopeManagerTest extends Specification { - def writer = new ListWriter() - def tracer = new DDTracer(writer) + def latch + def writer + def tracer @Subject - def scopeManager = tracer.scopeManager() + def scopeManager + + def setup() { + latch = new CountDownLatch(1) + final currentLatch = latch + writer = new ListWriter() { + void incrementTraceCount() { + currentLatch.countDown() + } + } + tracer = new DDTracer(writer) + scopeManager = tracer.scopeManager() + } def cleanup() { scopeManager.tlsScope.remove() @@ -255,13 +270,12 @@ class ScopeManagerTest extends Specification { writer == [[scope.span()]] } - @Timeout(value = 60, unit = TimeUnit.SECONDS) + @Timeout(value = 60, unit = SECONDS) def "hard reference on continuation prevents trace from reporting"() { setup: def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() - def traceCount = ((DDSpan) span).context().tracer.traceCount scope.setAsyncPropagation(true) def continuation = scope.capture() scope.close() @@ -277,9 +291,7 @@ class ScopeManagerTest extends Specification { def continuationRef = new WeakReference<>(continuation) continuation = null // Continuation references also hold up traces. GCUtils.awaitGC(continuationRef) - while (traceCount.get() == 0) { - // wait until trace count increments or timeout expires - } + latch.await(60, SECONDS) } if (autoClose) { if (continuation != null) { @@ -289,7 +301,6 @@ class ScopeManagerTest extends Specification { then: forceGC ? true : writer == [[span]] - traceCount.get() == 1 where: autoClose | forceGC diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy index 216e47ff75..4c1153cf10 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/DDTracerTest.groovy @@ -132,7 +132,7 @@ class DDTracerTest extends Specification { expect: tracer.writer instanceof DDAgentWriter - tracer.traceCount.is(((DDAgentWriter) tracer.writer).getApi().traceCount) + tracer.writer.traceCount.is(((DDAgentWriter) tracer.writer).traceCount) where: key | value diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index ffda905c00..e64e3e2205 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -1,90 +1,196 @@ package datadog.trace.api.writer import datadog.opentracing.DDSpan +import datadog.opentracing.DDSpanContext +import datadog.opentracing.DDTracer +import datadog.opentracing.PendingTrace +import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.DDAgentWriter import datadog.trace.common.writer.DDApi -import datadog.trace.common.writer.WriterQueue import spock.lang.Specification +import spock.lang.Timeout + +import java.util.concurrent.TimeUnit import static datadog.opentracing.SpanFactory.newSpanOf -import static org.mockito.Mockito.mock -import static org.mockito.Mockito.verifyNoMoreInteractions +import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE +@Timeout(20) class DDAgentWriterTest extends Specification { + def api = Mock(DDApi) - def "calls to the API are scheduled"() { - + def "test happy path"() { setup: - def api = Mock(DDApi) - def writer = new DDAgentWriter(api) - - when: + def writer = new DDAgentWriter(api, 2, -1) writer.start() - Thread.sleep(flush_time_wait) - - then: - 0 * api.sendTraces(_ as List) - - when: - for (def i = 0; i < tick; i++) { - writer.write(trace) - Thread.sleep(flush_time_wait) - } - - then: - tick * api.sendTraces([trace]) - - where: - trace = [newSpanOf(0)] - flush_time_wait = (int) (1.2 * (DDAgentWriter.FLUSH_TIME_SECONDS * 1_000)) - tick << [1, 3] - } - - def "check if trace has been added by force"() { - - setup: - def traces = new WriterQueue>(capacity) - def writer = new DDAgentWriter(Mock(DDApi), traces) - - when: - for (def i = 0; i < capacity; i++) { - writer.write([]) - } - - then: - traces.size() == capacity when: writer.write(trace) + writer.write(trace) + writer.flush() then: - traces.size() == capacity - traces.getAll().contains(trace) + 2 * api.serializeTrace(_) >> { trace -> callRealMethod() } + 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) + 0 * _ + + cleanup: + writer.close() where: - trace = [newSpanOf(0)] - capacity = 10 + trace = [newSpanOf(0, "fixed-thread-name")] + } + def "test flood of traces"() { + setup: + def writer = new DDAgentWriter(api, disruptorSize, -1) + writer.start() + when: + (1..traceCount).each { + writer.write(trace) + } + writer.flush() + + then: + _ * api.serializeTrace(_) >> { trace -> callRealMethod() } + 1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount }) + 0 * _ + + cleanup: + writer.close() + + where: + trace = [newSpanOf(0, "fixed-thread-name")] + disruptorSize = 2 + traceCount = 100 // Shouldn't trigger payload, but bigger than the disruptor size. + } + + def "test flush by size"() { + setup: + def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1) + def phaser = writer.apiPhaser + writer.start() + phaser.register() + + when: + (1..6).each { + writer.write(trace) + } + // Wait for 2 flushes of 3 by size + phaser.awaitAdvanceInterruptibly(phaser.arrive()) + phaser.awaitAdvanceInterruptibly(phaser.arriveAndDeregister()) + + then: + 6 * api.serializeTrace(_) >> { trace -> callRealMethod() } + 2 * api.sendSerializedTraces(3, _, { it.size() == 3 }) + + when: + (1..2).each { + writer.write(trace) + } + // Flush the remaining 2 + writer.flush() + + then: + 2 * api.serializeTrace(_) >> { trace -> callRealMethod() } + 1 * api.sendSerializedTraces(2, _, { it.size() == 2 }) + 0 * _ + + cleanup: + writer.close() + + where: + span = [newSpanOf(0, "fixed-thread-name")] + trace = (0..10000).collect { span } + } + + def "test flush by time"() { + setup: + def writer = new DDAgentWriter(api) + def phaser = writer.apiPhaser + phaser.register() + writer.start() + writer.flush() + + when: + (1..5).each { + writer.write(trace) + } + phaser.awaitAdvanceInterruptibly(phaser.arriveAndDeregister()) + + then: + 5 * api.serializeTrace(_) >> { trace -> callRealMethod() } + 1 * api.sendSerializedTraces(5, _, { it.size() == 5 }) + 0 * _ + + cleanup: + writer.close() + + where: + span = [newSpanOf(0, "fixed-thread-name")] + trace = (1..10).collect { span } + } + + def "test default buffer size"() { + setup: + def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1) + writer.start() + + when: + (0..maxedPayloadTraceCount).each { + writer.write(minimalTrace) + def start = System.nanoTime() + // (consumer processes a trace in about 20 microseconds + while (System.nanoTime() - start < TimeUnit.MICROSECONDS.toNanos(100)) { + // Busywait because we don't want to fill up the ring buffer + } + } + writer.flush() + + then: + (maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() } + 1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount }) + + cleanup: + writer.close() + + where: + minimalContext = new DDSpanContext( + "1", + "1", + "0", + "", + "", + "", + PrioritySampling.UNSET, + "", + Collections.emptyMap(), + false, + "", + Collections.emptyMap(), + Mock(PendingTrace), + Mock(DDTracer)) + minimalSpan = new DDSpan(0, minimalContext) + minimalTrace = [minimalSpan] + traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length + maxedPayloadTraceCount = ((int) (DDAgentWriter.FLUSH_PAYLOAD_BYTES / traceSize)) + 1 } def "check that are no interactions after close"() { setup: - def api = mock(DDApi) def writer = new DDAgentWriter(api) writer.start() when: writer.close() writer.write([]) - Thread.sleep(flush_time_wait) + writer.flush() then: - verifyNoMoreInteractions(api) - - where: - flush_time_wait = (int) (1.2 * (DDAgentWriter.FLUSH_TIME_SECONDS * 1_000)) + 0 * _ + writer.traceCount.get() == 0 } } diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy index 1cd0c7372c..f77e7f4b67 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy @@ -2,11 +2,9 @@ package datadog.trace.api.writer import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.ObjectMapper import datadog.opentracing.SpanFactory import datadog.trace.common.writer.DDApi import datadog.trace.common.writer.DDApi.ResponseListener -import org.msgpack.jackson.dataformat.MessagePackFactory import spock.lang.Specification import java.util.concurrent.atomic.AtomicReference @@ -14,15 +12,20 @@ import java.util.concurrent.atomic.AtomicReference import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer class DDApiTest extends Specification { - static mapper = new ObjectMapper(new MessagePackFactory()) + static mapper = DDApi.OBJECT_MAPPER def "sending an empty list of traces returns no errors"() { setup: def agent = httpServer { handlers { put("v0.4/traces") { - def status = request.contentLength > 0 ? 200 : 500 - response.status(status).send() + if (request.contentType != "application/msgpack") { + response.status(400).send("wrong type: $request.contentType") + } else if (request.contentLength <= 0) { + response.status(400).send("no content") + } else { + response.status(200).send() + } } } } @@ -68,7 +71,6 @@ class DDApiTest extends Specification { expect: client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces" - client.getTraceCounter().addAndGet(traces.size()) >= 0 client.sendTraces(traces) agent.lastRequest.contentType == "application/msgpack" agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java" @@ -82,9 +84,9 @@ class DDApiTest extends Specification { // Populate thread info dynamically as it is different when run via gradle vs idea. where: - traces | expectedRequestBody - [] | [] - [SpanFactory.newSpanOf(1L).setTag("service.name", "my-service")] | [new TreeMap<>([ + traces | expectedRequestBody + [] | [] + [[SpanFactory.newSpanOf(1L).setTag("service.name", "my-service")]] | [[new TreeMap<>([ "duration" : 0, "error" : 0, "meta" : ["thread.name": Thread.currentThread().getName(), "thread.id": "${Thread.currentThread().id}"], @@ -97,8 +99,8 @@ class DDApiTest extends Specification { "start" : 1000, "trace_id" : 1, "type" : "fakeType" - ])] - [SpanFactory.newSpanOf(100L).setTag("resource.name", "my-resource")] | [new TreeMap<>([ + ])]] + [[SpanFactory.newSpanOf(100L).setTag("resource.name", "my-resource")]] | [[new TreeMap<>([ "duration" : 0, "error" : 0, "meta" : ["thread.name": Thread.currentThread().getName(), "thread.id": "${Thread.currentThread().id}"], @@ -111,7 +113,7 @@ class DDApiTest extends Specification { "start" : 100000, "trace_id" : 1, "type" : "fakeType" - ])] + ])]] } def "Api ResponseListeners see 200 responses"() { @@ -130,18 +132,15 @@ class DDApiTest extends Specification { } def client = new DDApi("localhost", agent.address.port, null) client.addResponseListener(responseListener) - def traceCounter = client.getTraceCounter() - traceCounter.set(3) when: - client.sendTraces([]) + client.sendTraces([[], [], []]) then: agentResponse.get() == '{"hello":"test"}' agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java" agent.lastRequest.headers.get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown") agent.lastRequest.headers.get("Datadog-Meta-Tracer-Version") == "Stubbed-Test-Version" agent.lastRequest.headers.get("X-Datadog-Trace-Count") == "3" // false data shows the value provided via traceCounter. - traceCounter.get() == 0 cleanup: agent.close() @@ -200,11 +199,7 @@ class DDApiTest extends Specification { "v0.3" | 30000 | false } - static List> convertList(byte[] bytes) { - return mapper.readValue(bytes, new TypeReference>>() {}) - } - - static TreeMap convertMap(byte[] bytes) { - return mapper.readValue(bytes, new TypeReference>() {}) + static List>> convertList(byte[] bytes) { + return mapper.readValue(bytes, new TypeReference>>>() {}) } } diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/SerializationTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/SerializationTest.groovy new file mode 100644 index 0000000000..1c8845f1d0 --- /dev/null +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/SerializationTest.groovy @@ -0,0 +1,62 @@ +package datadog.trace.api.writer + + +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper +import org.msgpack.core.MessagePack +import org.msgpack.jackson.dataformat.MessagePackFactory +import spock.lang.Shared +import spock.lang.Specification + +import static java.util.Collections.singletonMap + +class SerializationTest extends Specification { + @Shared + def jsonMapper = new ObjectMapper() + @Shared + def mpMapper = new ObjectMapper(new MessagePackFactory()) + + + def "test json mapper serialization"() { + setup: + def map = ["key1": "val1"] + def serializedMap = mapper.writeValueAsBytes(map) + def serializedList = "[${new String(serializedMap)}]".getBytes() + + when: + def result = mapper.readValue(serializedList, new TypeReference>>() {}) + + then: + result == [map] + new String(serializedList) == '[{"key1":"val1"}]' + + where: + mapper = jsonMapper + } + + def "test msgpack mapper serialization"() { + setup: + def serializedMaps = input.collect { + mapper.writeValueAsBytes(it) + } + + def packer = MessagePack.newDefaultBufferPacker() + packer.packArrayHeader(serializedMaps.size()) + serializedMaps.each { + packer.writePayload(it) + } + def serializedList = packer.toByteArray() + + when: + def result = mapper.readValue(serializedList, new TypeReference>>() {}) + + then: + result == input + + where: + mapper = mpMapper + + // GStrings get odd results in the serializer. + input = (1..1).collect { singletonMap("key$it".toString(), "val$it".toString()) } + } +} diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/WriterQueueTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/WriterQueueTest.groovy deleted file mode 100644 index 13201b44dc..0000000000 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/WriterQueueTest.groovy +++ /dev/null @@ -1,181 +0,0 @@ -package datadog.trace.api.writer - -import datadog.trace.common.writer.WriterQueue -import spock.lang.Specification - -import java.util.concurrent.Phaser -import java.util.concurrent.atomic.AtomicInteger - -class WriterQueueTest extends Specification { - - def "instantiate a empty queue throws an exception"() { - when: - new WriterQueue(0) - - then: - thrown IllegalArgumentException - - when: - new WriterQueue(-1) - - then: - thrown IllegalArgumentException - } - - def "full the queue without forcing"() { - - setup: - def queue = new WriterQueue(capacity) - def removed = false - - when: - for (def i = 0; i < capacity; i++) { - removed = removed || queue.add(i) != null - } - - then: - !removed - - where: - capacity << [1, 10, 100] - - } - - def "force element add to a full queue"() { - - setup: - def queue = new WriterQueue(capacity) - for (def i = 0; i < capacity; i++) { - queue.add(i) - } - - when: - def removed = queue.add(1) - - then: - removed != null - queue.size() == capacity - - where: - capacity << [1, 10, 100] - - } - - def "drain the queue into another collection"() { - - setup: - def queue = new WriterQueue(capacity) - for (def i = 0; i < capacity; i++) { - queue.add(i) - } - - when: - def list = queue.getAll() - - then: - list.size() == capacity - queue.isEmpty() - queue.size() == 0 - - where: - capacity << [1, 10, 100] - - } - - def "check concurrency on writes"() { - setup: - - def phaser1 = new Phaser() - def phaser2 = new Phaser() - def queue = new WriterQueue(capacity) - def insertionCount = new AtomicInteger(0) - - phaser1.register() // global start - phaser2.register() // global stop - - numberThreads.times { - phaser1.register() - Thread.start { - phaser2.register() - phaser1.arriveAndAwaitAdvance() - numberInsertionsPerThread.times { - queue.add(1) - insertionCount.getAndIncrement() - } - phaser2.arriveAndAwaitAdvance() - } - } - - when: - phaser1.arriveAndAwaitAdvance() // allow threads to start - phaser2.arriveAndAwaitAdvance() // wait till the job is not finished - - then: - queue.size() == capacity - insertionCount.get() == numberInsertionsPerThread * numberThreads - - where: - capacity = 100 - numberThreads << [1, 10, 100] - numberInsertionsPerThread = 100 - } - - - def "check concurrency on writes and reads"() { - setup: - def phaser1 = new Phaser() - def phaser2 = new Phaser() - def queue = new WriterQueue(capacity) - def insertionCount = new AtomicInteger(0) - def droppedCount = new AtomicInteger(0) - def getCount = new AtomicInteger(0) - def numberElements = new AtomicInteger(0) - - phaser1.register() // global start - phaser2.register() // global stop - - // writes - numberThreadsWrites.times { - phaser1.register() - Thread.start { - phaser2.register() - phaser1.arriveAndAwaitAdvance() - numberInsertionsPerThread.times { - queue.add(1) != null ? droppedCount.getAndIncrement() : null - insertionCount.getAndIncrement() - } - phaser2.arriveAndAwaitAdvance() - } - } - - // reads - numberThreadsReads.times { - phaser1.register() - Thread.start { - phaser2.register() - phaser1.arriveAndAwaitAdvance() - numberGetsPerThread.times { - numberElements.getAndAdd(queue.getAll().size()) - getCount.getAndIncrement() - } - phaser2.arriveAndAwaitAdvance() - } - } - - when: - phaser1.arriveAndAwaitAdvance() // allow threads to start - phaser2.arriveAndAwaitAdvance() // wait till the job is not finished - - then: - insertionCount.get() == numberInsertionsPerThread * numberThreadsWrites - getCount.get() == numberGetsPerThread * numberThreadsReads - insertionCount.get() == numberElements + queue.size() + droppedCount - - where: - capacity = 100 - numberThreadsWrites << [1, 10, 100] - numberThreadsReads << [1, 5, 10] - numberInsertionsPerThread = 100 - numberGetsPerThread = 5 - } -} diff --git a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy index 0bb0fd3b40..a3b80996b5 100644 --- a/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy +++ b/dd-trace-ot/src/traceAgentTest/groovy/DDApiIntegrationTest.groovy @@ -151,18 +151,6 @@ class DDApiIntegrationTest { [[new DDSpan(1, CONTEXT)]] | 3 [[new DDSpan(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()), CONTEXT)]] | 4 } - - def "Sending bad trace fails (test #test)"() { - expect: - api.sendTraces(traces) == false - - where: - traces | test - [""] | 1 - ["", 123] | 2 - [[:]] | 3 - [new Object()] | 4 - } } @Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })