Merge pull request #655 from DataDog/mar-kolya/new-api-agent-writer

Mar kolya/new api agent writer
This commit is contained in:
Nikolay Martynov 2019-01-22 17:12:37 -05:00 committed by GitHub
commit 62998ff006
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 506 additions and 7 deletions

View File

@ -3,7 +3,6 @@ package datadog.trace.tracer;
import datadog.trace.api.Config;
import datadog.trace.tracer.sampling.AllSampler;
import datadog.trace.tracer.sampling.Sampler;
import datadog.trace.tracer.writer.LoggingWriter;
import datadog.trace.tracer.writer.Writer;
import java.util.ArrayList;
import java.util.Collections;
@ -40,7 +39,7 @@ public class Tracer {
// TODO: implement and include "standard" interceptors
this(
config,
new LoggingWriter(),
Writer.Builder.forConfig(config),
new AllSampler(),
Collections.unmodifiableList(new ArrayList<>(interceptors)));
}

View File

@ -16,6 +16,7 @@ import java.net.URL;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.msgpack.jackson.dataformat.MessagePackFactory;
@ -32,18 +33,21 @@ class AgentClient {
static final String DATADOG_META_TRACER_VERSION = "Datadog-Meta-Tracer-Version";
static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count";
static final int CONNECT_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(1);
static final int READ_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(1);
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
private static final LogRateLimiter LOG_RATE_LIMITER =
new LogRateLimiter(log, MILLISECONDS_BETWEEN_ERROR_LOG);
private final URL tracesUrl;
@Getter private final URL agentUrl;
AgentClient(final String host, final int port) {
final String url = "http://" + host + ":" + port + TRACES_ENDPOINT;
try {
tracesUrl = new URL(url);
agentUrl = new URL(url);
} catch (final MalformedURLException e) {
// This should essentially mean agent should bail out from installing, we cannot meaningfully
// recover from this.
@ -87,10 +91,16 @@ class AgentClient {
}
private HttpURLConnection createHttpConnection() throws IOException {
final HttpURLConnection connection = (HttpURLConnection) tracesUrl.openConnection();
final HttpURLConnection connection = (HttpURLConnection) agentUrl.openConnection();
connection.setDoOutput(true);
connection.setDoInput(true);
// It is important to have timeout for agent request here: we need to finish request in some
// reasonable amount
// of time to allow following requests to be run.
connection.setConnectTimeout(CONNECT_TIMEOUT);
connection.setReadTimeout(READ_TIMEOUT);
connection.setRequestMethod("PUT");
connection.setRequestProperty(CONTENT_TYPE, MSGPACK);
connection.setRequestProperty(DATADOG_META_LANG, "java");

View File

@ -0,0 +1,168 @@
package datadog.trace.tracer.writer;
import datadog.trace.tracer.Trace;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AgentWriter implements Writer {
/** Maximum number of traces kept in memory */
static final int DEFAULT_QUEUE_SIZE = 7000;
/** Flush interval for the API in seconds */
static final long FLUSH_TIME_SECONDS = 1;
/** Maximum amount of time to await for scheduler to shutdown */
static final long SHUTDOWN_TIMEOUT_SECONDS = 1;
private static final ThreadFactory THREAD_FACTORY =
new ThreadFactory() {
@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r, "dd-agent-writer");
thread.setDaemon(true);
return thread;
}
};
/** Scheduled thread pool, acting like a cron */
private final ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(1, THREAD_FACTORY);
private final TracesSendingTask task;
private final ShutdownCallback shutdownCallback;
public AgentWriter(final AgentClient client) {
this(client, DEFAULT_QUEUE_SIZE);
}
AgentWriter(final AgentClient client, final int queueSize) {
task = new TracesSendingTask(client, queueSize);
shutdownCallback = new ShutdownCallback(executorService);
}
/** @return Datadog agwent URL. Visible for testing. */
URL getAgentUrl() {
return task.getClient().getAgentUrl();
}
@Override
public void write(final Trace trace) {
if (trace.isValid()) {
if (!task.getQueue().offer(trace)) {
log.debug("Writer queue is full, dropping trace {}", trace);
}
}
}
@Override
public void incrementTraceCount() {
task.getTraceCount().incrementAndGet();
}
@Override
public SampleRateByService getSampleRateByService() {
return task.getSampleRateByService().get();
}
@Override
public void start() {
executorService.scheduleAtFixedRate(task, 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS);
try {
Runtime.getRuntime().addShutdownHook(shutdownCallback);
} catch (final IllegalStateException ex) {
// The JVM is already shutting down.
}
}
@Override
public void close() {
// Perform actions needed to shutdown this writer
shutdownCallback.run();
}
@Override
public void finalize() {
close();
}
/** Infinite tasks blocking until some spans come in the queue. */
private static final class TracesSendingTask implements Runnable {
/** The Datadog agent client */
@Getter private final AgentClient client;
/** Queue size */
private final int queueSize;
/** In memory collection of traces waiting for departure */
@Getter private final BlockingQueue<Trace> queue;
/** Number of traces to be written */
@Getter private final AtomicInteger traceCount = new AtomicInteger(0);
/** Sample rate by service returned by Datadog agent */
@Getter
private final AtomicReference<SampleRateByService> sampleRateByService =
new AtomicReference<>(SampleRateByService.EMPTY_INSTANCE);
TracesSendingTask(final AgentClient client, final int queueSize) {
this.client = client;
this.queueSize = queueSize;
queue = new ArrayBlockingQueue<>(queueSize);
}
@Override
public void run() {
try {
final List<Trace> tracesToWrite = new ArrayList<>(queueSize);
queue.drainTo(tracesToWrite);
if (tracesToWrite.size() > 0) {
sampleRateByService.set(client.sendTraces(tracesToWrite, traceCount.getAndSet(0)));
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
}
}
}
/**
* Helper to handle shutting down of the Writer because JVM is shutting down or Writer is closed.
*/
// Visible for testing
static final class ShutdownCallback extends Thread {
private final ExecutorService executorService;
public ShutdownCallback(final ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void run() {
// We use this logic in two cases:
// * When JVM is shutting down
// * When Writer is closed manually/via GC
// In latter case we need to remove shutdown hook.
try {
Runtime.getRuntime().removeShutdownHook(this);
} catch (final IllegalStateException ex) {
// The JVM may be shutting down.
}
try {
executorService.shutdownNow();
executorService.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted.");
}
}
}
}

View File

@ -16,6 +16,11 @@ public class LoggingWriter implements Writer {
// Nothing to do here.
}
@Override
public SampleRateByService getSampleRateByService() {
return SampleRateByService.EMPTY_INSTANCE;
}
@Override
public void start() {
// TODO: do we really need this? and if we do - who is responsible for calling this?

View File

@ -12,6 +12,8 @@ import lombok.EqualsAndHashCode;
@EqualsAndHashCode
class SampleRateByService {
static final SampleRateByService EMPTY_INSTANCE = new SampleRateByService(Collections.EMPTY_MAP);
private final Map<String, Double> rateByService;
@JsonCreator

View File

@ -1,6 +1,9 @@
package datadog.trace.tracer.writer;
import datadog.trace.api.Config;
import datadog.trace.tracer.Trace;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
/** A writer sends traces to some place. */
public interface Writer {
@ -21,6 +24,9 @@ public interface Writer {
*/
void incrementTraceCount();
/** @return Most up to date {@link SampleRateByService} instance. */
SampleRateByService getSampleRateByService();
/** Start the writer */
void start();
@ -29,4 +35,41 @@ public interface Writer {
* connections and tasks
*/
void close();
@Slf4j
final class Builder {
public static Writer forConfig(final Config config) {
if (config == null) {
// There is no way config is not create so getting here must be a code bug
throw new NullPointerException("Config is required to create writer");
}
final Writer writer;
final String configuredType = config.getWriterType();
if (Config.DD_AGENT_WRITER_TYPE.equals(configuredType)) {
writer = createAgentWriter(config);
} else if (Config.LOGGING_WRITER_TYPE.equals(configuredType)) {
writer = new LoggingWriter();
} else {
log.warn(
"Writer type not configured correctly: Type {} not recognized. Defaulting to AgentWriter.",
configuredType);
writer = createAgentWriter(config);
}
return writer;
}
public static Writer forConfig(final Properties config) {
return forConfig(Config.get(config));
}
private static Writer createAgentWriter(final Config config) {
return new AgentWriter(new AgentClient(config.getAgentHost(), config.getAgentPort()));
}
private Builder() {}
}
}

View File

@ -2,7 +2,8 @@ package datadog.trace.tracer
import datadog.trace.api.Config
import datadog.trace.tracer.sampling.AllSampler
import datadog.trace.tracer.writer.LoggingWriter
import datadog.trace.tracer.writer.AgentWriter
import datadog.trace.tracer.writer.SampleRateByService
import datadog.trace.tracer.writer.Writer
import spock.lang.Shared
import spock.lang.Specification
@ -24,7 +25,7 @@ class TracerTest extends Specification {
def tracer = new Tracer(config)
then:
tracer.getWriter() instanceof LoggingWriter
tracer.getWriter() instanceof AgentWriter
tracer.getSampler() instanceof AllSampler
tracer.getInterceptors() == []
tracer.getDefaultServiceName() == config.getServiceName()
@ -181,6 +182,11 @@ class TracerTest extends Specification {
traceCount.incrementAndGet()
}
@Override
SampleRateByService getSampleRateByService() {
return null // Doesn't matter for now
}
@Override
void start() {
//nothing to do for now

View File

@ -126,6 +126,21 @@ class AgentClientTest extends Specification {
response == null
}
def "test timeout"() {
setup:
stubFor(put(urlEqualTo(AgentClient.TRACES_ENDPOINT))
.willReturn(aResponse()
.withStatus(200)
.withChunkedDribbleDelay(5, AgentClient.READ_TIMEOUT * 2)))
def trace = createTrace("123")
when:
def response = client.sendTraces([trace], TRACE_COUNT)
then:
response == null
}
def "test invalid url"() {
when:

View File

@ -0,0 +1,180 @@
package datadog.trace.tracer.writer
import datadog.trace.tracer.Trace
import spock.lang.Specification
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
class AgentWriterTest extends Specification {
// Amount of time within with we expect flush to happen.
// We make this slightly longer than flush time.
private static final int FLUSH_DELAY = TimeUnit.SECONDS.toMillis(AgentWriter.FLUSH_TIME_SECONDS * 2)
private static final AGENT_URL = new URL("http://example.com")
def sampleRateByService = Mock(SampleRateByService)
def client = Mock(AgentClient) {
getAgentUrl() >> AGENT_URL
}
def "test happy path"() {
setup:
def incrementTraceCountBy = 5
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> false
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client)
writer.start()
when:
for (def trace : traces) {
writer.write(trace)
}
incrementTraceCountBy.times {
writer.incrementTraceCount()
}
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0], traces[2]], incrementTraceCountBy) >> sampleRateByService
and:
writer.getSampleRateByService() == sampleRateByService
then:
0 * client.sendTraces(_, _)
cleanup:
writer.close()
}
def "test small queue"() {
setup:
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client, 1)
when:
for (def trace : traces) {
writer.write(trace)
}
writer.start()
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0]], 0)
cleanup:
writer.close()
}
def "test client exception handling"() {
setup:
def traces = [
Mock(Trace) {
isValid() >> true
},
Mock(Trace) {
isValid() >> true
}]
def writer = new AgentWriter(client)
writer.start()
when:
writer.write(traces[0])
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[0]], 0) >> { throw new IOException("test exception")}
writer.getSampleRateByService() == SampleRateByService.EMPTY_INSTANCE
when:
writer.write(traces[1])
Thread.sleep(FLUSH_DELAY)
then:
1 * client.sendTraces([traces[1]], 0) >> sampleRateByService
writer.getSampleRateByService() == sampleRateByService
cleanup:
writer.close()
}
def "test agent url getter"() {
setup:
def writer = new AgentWriter(client)
when:
def agentUrl = writer.getAgentUrl()
then:
agentUrl == AGENT_URL
}
def "test default sample rate by service"() {
setup:
def writer = new AgentWriter(client)
when:
def sampleRateByService = writer.getSampleRateByService()
then:
sampleRateByService == SampleRateByService.EMPTY_INSTANCE
}
def "test start/#closeMethod"() {
setup:
def writer = new AgentWriter(client)
expect:
!isWriterThreadRunning()
when:
writer.start()
then:
isWriterThreadRunning()
when:
writer."${closeMethod}"()
then:
!isWriterThreadRunning()
where:
closeMethod | _
"close" | _
"finalize" | _
}
def "test shutdown callback"() {
setup:
def executor = Mock(ExecutorService) {
awaitTermination(_, _) >> { throw new InterruptedException() }
}
def callback = new AgentWriter.ShutdownCallback(executor)
when:
callback.run()
then:
noExceptionThrown()
}
boolean isWriterThreadRunning() {
return Thread.getAllStackTraces().keySet().any{ t -> t.getName() == "dd-agent-writer" }
}
}

View File

@ -37,4 +37,12 @@ class LoggingWriterTest extends Specification {
1 * trace.toString()
}
def "test getter"() {
when:
def sampleRateByInstance = writer.getSampleRateByService()
then:
sampleRateByInstance == SampleRateByService.EMPTY_INSTANCE
}
}

View File

@ -0,0 +1,63 @@
package datadog.trace.tracer.writer
import datadog.trace.api.Config
import spock.lang.Specification
class WriterTest extends Specification {
def "test builder logging writer"() {
setup:
def config = Mock(Config) {
getWriterType() >> Config.LOGGING_WRITER_TYPE
}
when:
def writer = Writer.Builder.forConfig(config)
then:
writer instanceof LoggingWriter
}
def "test builder logging writer properties"() {
setup:
def properties = new Properties()
properties.setProperty(Config.WRITER_TYPE, Config.LOGGING_WRITER_TYPE)
when:
def writer = Writer.Builder.forConfig(properties)
then:
writer instanceof LoggingWriter
}
def "test builder agent writer: '#writerType'"() {
setup:
def config = Mock(Config) {
getWriterType() >> writerType
getAgentHost() >> "test.host"
getAgentPort() >> 1234
}
when:
def writer = Writer.Builder.forConfig(config)
then:
writer instanceof AgentWriter
((AgentWriter) writer).getAgentUrl() == new URL("http://test.host:1234/v0.4/traces");
where:
writerType | _
Config.DD_AGENT_WRITER_TYPE | _
"some odd string" | _
}
def "test builder no config"() {
when:
Writer.Builder.forConfig(null)
then:
thrown NullPointerException
}
}