diff --git a/dd-trace/src/main/java/datadog/trace/tracer/Tracer.java b/dd-trace/src/main/java/datadog/trace/tracer/Tracer.java index f22b2a2913..b82a39deb9 100644 --- a/dd-trace/src/main/java/datadog/trace/tracer/Tracer.java +++ b/dd-trace/src/main/java/datadog/trace/tracer/Tracer.java @@ -3,7 +3,6 @@ package datadog.trace.tracer; import datadog.trace.api.Config; import datadog.trace.tracer.sampling.AllSampler; import datadog.trace.tracer.sampling.Sampler; -import datadog.trace.tracer.writer.LoggingWriter; import datadog.trace.tracer.writer.Writer; import java.util.ArrayList; import java.util.Collections; @@ -40,7 +39,7 @@ public class Tracer { // TODO: implement and include "standard" interceptors this( config, - new LoggingWriter(), + Writer.Builder.forConfig(config), new AllSampler(), Collections.unmodifiableList(new ArrayList<>(interceptors))); } diff --git a/dd-trace/src/main/java/datadog/trace/tracer/writer/AgentClient.java b/dd-trace/src/main/java/datadog/trace/tracer/writer/AgentClient.java index de25ea1583..bd11b15c04 100644 --- a/dd-trace/src/main/java/datadog/trace/tracer/writer/AgentClient.java +++ b/dd-trace/src/main/java/datadog/trace/tracer/writer/AgentClient.java @@ -16,6 +16,7 @@ import java.net.URL; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.msgpack.jackson.dataformat.MessagePackFactory; @@ -32,18 +33,21 @@ class AgentClient { static final String DATADOG_META_TRACER_VERSION = "Datadog-Meta-Tracer-Version"; static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count"; + static final int CONNECT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(1); + static final int READ_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(1); + private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory()); private static final LogRateLimiter LOG_RATE_LIMITER = new LogRateLimiter(log, MILLISECONDS_BETWEEN_ERROR_LOG); - private final URL tracesUrl; + @Getter private final URL agentUrl; AgentClient(final String host, final int port) { final String url = "http://" + host + ":" + port + TRACES_ENDPOINT; try { - tracesUrl = new URL(url); + agentUrl = new URL(url); } catch (final MalformedURLException e) { // This should essentially mean agent should bail out from installing, we cannot meaningfully // recover from this. @@ -87,10 +91,16 @@ class AgentClient { } private HttpURLConnection createHttpConnection() throws IOException { - final HttpURLConnection connection = (HttpURLConnection) tracesUrl.openConnection(); + final HttpURLConnection connection = (HttpURLConnection) agentUrl.openConnection(); connection.setDoOutput(true); connection.setDoInput(true); + // It is important to have timeout for agent request here: we need to finish request in some + // reasonable amount + // of time to allow following requests to be run. + connection.setConnectTimeout(CONNECT_TIMEOUT); + connection.setReadTimeout(READ_TIMEOUT); + connection.setRequestMethod("PUT"); connection.setRequestProperty(CONTENT_TYPE, MSGPACK); connection.setRequestProperty(DATADOG_META_LANG, "java"); diff --git a/dd-trace/src/main/java/datadog/trace/tracer/writer/AgentWriter.java b/dd-trace/src/main/java/datadog/trace/tracer/writer/AgentWriter.java new file mode 100644 index 0000000000..4141f54686 --- /dev/null +++ b/dd-trace/src/main/java/datadog/trace/tracer/writer/AgentWriter.java @@ -0,0 +1,168 @@ +package datadog.trace.tracer.writer; + +import datadog.trace.tracer.Trace; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AgentWriter implements Writer { + + /** Maximum number of traces kept in memory */ + static final int DEFAULT_QUEUE_SIZE = 7000; + /** Flush interval for the API in seconds */ + static final long FLUSH_TIME_SECONDS = 1; + /** Maximum amount of time to await for scheduler to shutdown */ + static final long SHUTDOWN_TIMEOUT_SECONDS = 1; + + private static final ThreadFactory THREAD_FACTORY = + new ThreadFactory() { + @Override + public Thread newThread(final Runnable r) { + final Thread thread = new Thread(r, "dd-agent-writer"); + thread.setDaemon(true); + return thread; + } + }; + + /** Scheduled thread pool, acting like a cron */ + private final ScheduledExecutorService executorService = + Executors.newScheduledThreadPool(1, THREAD_FACTORY); + + private final TracesSendingTask task; + private final ShutdownCallback shutdownCallback; + + public AgentWriter(final AgentClient client) { + this(client, DEFAULT_QUEUE_SIZE); + } + + AgentWriter(final AgentClient client, final int queueSize) { + task = new TracesSendingTask(client, queueSize); + shutdownCallback = new ShutdownCallback(executorService); + } + + /** @return Datadog agwent URL. Visible for testing. */ + URL getAgentUrl() { + return task.getClient().getAgentUrl(); + } + + @Override + public void write(final Trace trace) { + if (trace.isValid()) { + if (!task.getQueue().offer(trace)) { + log.debug("Writer queue is full, dropping trace {}", trace); + } + } + } + + @Override + public void incrementTraceCount() { + task.getTraceCount().incrementAndGet(); + } + + @Override + public SampleRateByService getSampleRateByService() { + return task.getSampleRateByService().get(); + } + + @Override + public void start() { + executorService.scheduleAtFixedRate(task, 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS); + try { + Runtime.getRuntime().addShutdownHook(shutdownCallback); + } catch (final IllegalStateException ex) { + // The JVM is already shutting down. + } + } + + @Override + public void close() { + // Perform actions needed to shutdown this writer + shutdownCallback.run(); + } + + @Override + public void finalize() { + close(); + } + + /** Infinite tasks blocking until some spans come in the queue. */ + private static final class TracesSendingTask implements Runnable { + + /** The Datadog agent client */ + @Getter private final AgentClient client; + /** Queue size */ + private final int queueSize; + /** In memory collection of traces waiting for departure */ + @Getter private final BlockingQueue queue; + /** Number of traces to be written */ + @Getter private final AtomicInteger traceCount = new AtomicInteger(0); + /** Sample rate by service returned by Datadog agent */ + @Getter + private final AtomicReference sampleRateByService = + new AtomicReference<>(SampleRateByService.EMPTY_INSTANCE); + + TracesSendingTask(final AgentClient client, final int queueSize) { + this.client = client; + this.queueSize = queueSize; + queue = new ArrayBlockingQueue<>(queueSize); + } + + @Override + public void run() { + try { + final List tracesToWrite = new ArrayList<>(queueSize); + queue.drainTo(tracesToWrite); + if (tracesToWrite.size() > 0) { + sampleRateByService.set(client.sendTraces(tracesToWrite, traceCount.getAndSet(0))); + } + } catch (final Throwable e) { + log.debug("Failed to send traces to the API: {}", e.getMessage()); + } + } + } + + /** + * Helper to handle shutting down of the Writer because JVM is shutting down or Writer is closed. + */ + // Visible for testing + static final class ShutdownCallback extends Thread { + + private final ExecutorService executorService; + + public ShutdownCallback(final ExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public void run() { + // We use this logic in two cases: + // * When JVM is shutting down + // * When Writer is closed manually/via GC + // In latter case we need to remove shutdown hook. + try { + Runtime.getRuntime().removeShutdownHook(this); + } catch (final IllegalStateException ex) { + // The JVM may be shutting down. + } + + try { + executorService.shutdownNow(); + executorService.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + log.info("Writer properly closed and async writer interrupted."); + } + } + } +} diff --git a/dd-trace/src/main/java/datadog/trace/tracer/writer/LoggingWriter.java b/dd-trace/src/main/java/datadog/trace/tracer/writer/LoggingWriter.java index b7adb21802..38db25dabd 100644 --- a/dd-trace/src/main/java/datadog/trace/tracer/writer/LoggingWriter.java +++ b/dd-trace/src/main/java/datadog/trace/tracer/writer/LoggingWriter.java @@ -16,6 +16,11 @@ public class LoggingWriter implements Writer { // Nothing to do here. } + @Override + public SampleRateByService getSampleRateByService() { + return SampleRateByService.EMPTY_INSTANCE; + } + @Override public void start() { // TODO: do we really need this? and if we do - who is responsible for calling this? diff --git a/dd-trace/src/main/java/datadog/trace/tracer/writer/SampleRateByService.java b/dd-trace/src/main/java/datadog/trace/tracer/writer/SampleRateByService.java index 675e04a4d0..652017e750 100644 --- a/dd-trace/src/main/java/datadog/trace/tracer/writer/SampleRateByService.java +++ b/dd-trace/src/main/java/datadog/trace/tracer/writer/SampleRateByService.java @@ -12,6 +12,8 @@ import lombok.EqualsAndHashCode; @EqualsAndHashCode class SampleRateByService { + static final SampleRateByService EMPTY_INSTANCE = new SampleRateByService(Collections.EMPTY_MAP); + private final Map rateByService; @JsonCreator diff --git a/dd-trace/src/main/java/datadog/trace/tracer/writer/Writer.java b/dd-trace/src/main/java/datadog/trace/tracer/writer/Writer.java index 1dc07cab7e..e26b86b815 100644 --- a/dd-trace/src/main/java/datadog/trace/tracer/writer/Writer.java +++ b/dd-trace/src/main/java/datadog/trace/tracer/writer/Writer.java @@ -1,6 +1,9 @@ package datadog.trace.tracer.writer; +import datadog.trace.api.Config; import datadog.trace.tracer.Trace; +import java.util.Properties; +import lombok.extern.slf4j.Slf4j; /** A writer sends traces to some place. */ public interface Writer { @@ -21,6 +24,9 @@ public interface Writer { */ void incrementTraceCount(); + /** @return Most up to date {@link SampleRateByService} instance. */ + SampleRateByService getSampleRateByService(); + /** Start the writer */ void start(); @@ -29,4 +35,41 @@ public interface Writer { * connections and tasks */ void close(); + + @Slf4j + final class Builder { + + public static Writer forConfig(final Config config) { + if (config == null) { + // There is no way config is not create so getting here must be a code bug + throw new NullPointerException("Config is required to create writer"); + } + + final Writer writer; + + final String configuredType = config.getWriterType(); + if (Config.DD_AGENT_WRITER_TYPE.equals(configuredType)) { + writer = createAgentWriter(config); + } else if (Config.LOGGING_WRITER_TYPE.equals(configuredType)) { + writer = new LoggingWriter(); + } else { + log.warn( + "Writer type not configured correctly: Type {} not recognized. Defaulting to AgentWriter.", + configuredType); + writer = createAgentWriter(config); + } + + return writer; + } + + public static Writer forConfig(final Properties config) { + return forConfig(Config.get(config)); + } + + private static Writer createAgentWriter(final Config config) { + return new AgentWriter(new AgentClient(config.getAgentHost(), config.getAgentPort())); + } + + private Builder() {} + } } diff --git a/dd-trace/src/test/groovy/datadog/trace/tracer/TracerTest.groovy b/dd-trace/src/test/groovy/datadog/trace/tracer/TracerTest.groovy index 39e67ccdae..4824baa218 100644 --- a/dd-trace/src/test/groovy/datadog/trace/tracer/TracerTest.groovy +++ b/dd-trace/src/test/groovy/datadog/trace/tracer/TracerTest.groovy @@ -2,7 +2,8 @@ package datadog.trace.tracer import datadog.trace.api.Config import datadog.trace.tracer.sampling.AllSampler -import datadog.trace.tracer.writer.LoggingWriter +import datadog.trace.tracer.writer.AgentWriter +import datadog.trace.tracer.writer.SampleRateByService import datadog.trace.tracer.writer.Writer import spock.lang.Shared import spock.lang.Specification @@ -24,7 +25,7 @@ class TracerTest extends Specification { def tracer = new Tracer(config) then: - tracer.getWriter() instanceof LoggingWriter + tracer.getWriter() instanceof AgentWriter tracer.getSampler() instanceof AllSampler tracer.getInterceptors() == [] tracer.getDefaultServiceName() == config.getServiceName() @@ -181,6 +182,11 @@ class TracerTest extends Specification { traceCount.incrementAndGet() } + @Override + SampleRateByService getSampleRateByService() { + return null // Doesn't matter for now + } + @Override void start() { //nothing to do for now diff --git a/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentClientTest.groovy b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentClientTest.groovy index 002b372296..6558fc081a 100644 --- a/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentClientTest.groovy +++ b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentClientTest.groovy @@ -126,6 +126,21 @@ class AgentClientTest extends Specification { response == null } + def "test timeout"() { + setup: + stubFor(put(urlEqualTo(AgentClient.TRACES_ENDPOINT)) + .willReturn(aResponse() + .withStatus(200) + .withChunkedDribbleDelay(5, AgentClient.READ_TIMEOUT * 2))) + def trace = createTrace("123") + + when: + def response = client.sendTraces([trace], TRACE_COUNT) + + then: + response == null + } + def "test invalid url"() { when: diff --git a/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentWriterTest.groovy b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentWriterTest.groovy new file mode 100644 index 0000000000..84ac01406e --- /dev/null +++ b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/AgentWriterTest.groovy @@ -0,0 +1,180 @@ +package datadog.trace.tracer.writer + +import datadog.trace.tracer.Trace +import spock.lang.Specification + +import java.util.concurrent.ExecutorService +import java.util.concurrent.TimeUnit + + +class AgentWriterTest extends Specification { + + // Amount of time within with we expect flush to happen. + // We make this slightly longer than flush time. + private static final int FLUSH_DELAY = TimeUnit.SECONDS.toMillis(AgentWriter.FLUSH_TIME_SECONDS * 2) + + private static final AGENT_URL = new URL("http://example.com") + + def sampleRateByService = Mock(SampleRateByService) + def client = Mock(AgentClient) { + getAgentUrl() >> AGENT_URL + } + + def "test happy path"() { + setup: + def incrementTraceCountBy = 5 + def traces = [ + Mock(Trace) { + isValid() >> true + }, + Mock(Trace) { + isValid() >> false + }, + Mock(Trace) { + isValid() >> true + }] + def writer = new AgentWriter(client) + writer.start() + + when: + for (def trace : traces) { + writer.write(trace) + } + incrementTraceCountBy.times { + writer.incrementTraceCount() + } + Thread.sleep(FLUSH_DELAY) + + then: + 1 * client.sendTraces([traces[0], traces[2]], incrementTraceCountBy) >> sampleRateByService + and: + writer.getSampleRateByService() == sampleRateByService + then: + 0 * client.sendTraces(_, _) + + cleanup: + writer.close() + } + + def "test small queue"() { + setup: + def traces = [ + Mock(Trace) { + isValid() >> true + }, + Mock(Trace) { + isValid() >> true + }] + def writer = new AgentWriter(client, 1) + + when: + for (def trace : traces) { + writer.write(trace) + } + writer.start() + Thread.sleep(FLUSH_DELAY) + + then: + 1 * client.sendTraces([traces[0]], 0) + + cleanup: + writer.close() + } + + def "test client exception handling"() { + setup: + def traces = [ + Mock(Trace) { + isValid() >> true + }, + Mock(Trace) { + isValid() >> true + }] + def writer = new AgentWriter(client) + writer.start() + + when: + writer.write(traces[0]) + Thread.sleep(FLUSH_DELAY) + + then: + 1 * client.sendTraces([traces[0]], 0) >> { throw new IOException("test exception")} + writer.getSampleRateByService() == SampleRateByService.EMPTY_INSTANCE + + when: + writer.write(traces[1]) + Thread.sleep(FLUSH_DELAY) + + then: + 1 * client.sendTraces([traces[1]], 0) >> sampleRateByService + writer.getSampleRateByService() == sampleRateByService + + cleanup: + writer.close() + } + + def "test agent url getter"() { + setup: + def writer = new AgentWriter(client) + + when: + def agentUrl = writer.getAgentUrl() + + then: + agentUrl == AGENT_URL + } + + def "test default sample rate by service"() { + setup: + def writer = new AgentWriter(client) + + when: + def sampleRateByService = writer.getSampleRateByService() + + then: + sampleRateByService == SampleRateByService.EMPTY_INSTANCE + } + + def "test start/#closeMethod"() { + setup: + def writer = new AgentWriter(client) + + expect: + !isWriterThreadRunning() + + when: + writer.start() + + then: + isWriterThreadRunning() + + when: + writer."${closeMethod}"() + + then: + !isWriterThreadRunning() + + where: + closeMethod | _ + "close" | _ + "finalize" | _ + } + + def "test shutdown callback"() { + setup: + def executor = Mock(ExecutorService) { + awaitTermination(_, _) >> { throw new InterruptedException() } + } + def callback = new AgentWriter.ShutdownCallback(executor) + + when: + callback.run() + + then: + noExceptionThrown() + } + + boolean isWriterThreadRunning() { + return Thread.getAllStackTraces().keySet().any{ t -> t.getName() == "dd-agent-writer" } + } +} diff --git a/dd-trace/src/test/groovy/datadog/trace/tracer/writer/LoggingWriterTest.groovy b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/LoggingWriterTest.groovy index 805a7141ef..628dde4e12 100644 --- a/dd-trace/src/test/groovy/datadog/trace/tracer/writer/LoggingWriterTest.groovy +++ b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/LoggingWriterTest.groovy @@ -37,4 +37,12 @@ class LoggingWriterTest extends Specification { 1 * trace.toString() } + def "test getter"() { + when: + def sampleRateByInstance = writer.getSampleRateByService() + + then: + sampleRateByInstance == SampleRateByService.EMPTY_INSTANCE + } + } diff --git a/dd-trace/src/test/groovy/datadog/trace/tracer/writer/WriterTest.groovy b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/WriterTest.groovy new file mode 100644 index 0000000000..1ff4562e10 --- /dev/null +++ b/dd-trace/src/test/groovy/datadog/trace/tracer/writer/WriterTest.groovy @@ -0,0 +1,63 @@ +package datadog.trace.tracer.writer + +import datadog.trace.api.Config +import spock.lang.Specification + + +class WriterTest extends Specification { + + def "test builder logging writer"() { + setup: + def config = Mock(Config) { + getWriterType() >> Config.LOGGING_WRITER_TYPE + } + + when: + def writer = Writer.Builder.forConfig(config) + + then: + writer instanceof LoggingWriter + } + + def "test builder logging writer properties"() { + setup: + def properties = new Properties() + properties.setProperty(Config.WRITER_TYPE, Config.LOGGING_WRITER_TYPE) + + when: + def writer = Writer.Builder.forConfig(properties) + + then: + writer instanceof LoggingWriter + } + + def "test builder agent writer: '#writerType'"() { + setup: + def config = Mock(Config) { + getWriterType() >> writerType + getAgentHost() >> "test.host" + getAgentPort() >> 1234 + } + + when: + def writer = Writer.Builder.forConfig(config) + + then: + writer instanceof AgentWriter + ((AgentWriter) writer).getAgentUrl() == new URL("http://test.host:1234/v0.4/traces"); + + where: + writerType | _ + Config.DD_AGENT_WRITER_TYPE | _ + "some odd string" | _ + } + + def "test builder no config"() { + when: + Writer.Builder.forConfig(null) + + then: + thrown NullPointerException + } + +}