Merge pull request #766 from DataDog/mar-kolya/implement-http-timeout

Implement HTTP timeout when sending traces to Datadog agent
This commit is contained in:
Nikolay Martynov 2019-03-18 13:20:32 -04:00 committed by GitHub
commit 6508b8754e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 56 deletions

View File

@ -6,14 +6,10 @@ import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpan;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
@ -32,9 +28,6 @@ public class DDAgentWriter implements Writer {
/** Maximum number of traces kept in memory */ /** Maximum number of traces kept in memory */
static final int DEFAULT_MAX_TRACES = 7000; static final int DEFAULT_MAX_TRACES = 7000;
/** Timeout for the API in seconds */
static final long API_TIMEOUT_SECONDS = 1;
/** Flush interval for the API in seconds */ /** Flush interval for the API in seconds */
static final long FLUSH_TIME_SECONDS = 1; static final long FLUSH_TIME_SECONDS = 1;
@ -52,10 +45,6 @@ public class DDAgentWriter implements Writer {
private final ScheduledExecutorService scheduledExecutor = private final ScheduledExecutorService scheduledExecutor =
Executors.newScheduledThreadPool(1, agentWriterThreadFactory); Executors.newScheduledThreadPool(1, agentWriterThreadFactory);
/** Effective thread pool, where real logic is done */
private final ExecutorService executor =
Executors.newSingleThreadExecutor(agentWriterThreadFactory);
/** The DD agent api */ /** The DD agent api */
private final DDApi api; private final DDApi api;
@ -107,18 +96,11 @@ public class DDAgentWriter implements Writer {
@Override @Override
public void close() { public void close() {
scheduledExecutor.shutdownNow(); scheduledExecutor.shutdownNow();
executor.shutdownNow();
try { try {
scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS); scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted."); log.info("Writer properly closed and async writer interrupted.");
} }
try {
executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} catch (final InterruptedException e) {
log.info("Writer properly closed and async writer interrupted.");
}
} }
@Override @Override
@ -130,30 +112,12 @@ public class DDAgentWriter implements Writer {
return api; return api;
} }
/** Infinite tasks blocking until some spans come in the blocking queue. */
class TracesSendingTask implements Runnable { class TracesSendingTask implements Runnable {
@Override @Override
public void run() { public void run() {
final Future<Long> future = executor.submit(new SendingTask());
try { try {
final long nbTraces = future.get(API_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (nbTraces > 0) {
log.debug("Successfully sent {} traces to the API", nbTraces);
}
} catch (final TimeoutException e) {
log.debug("Timeout! Failed to send traces to the API: {}", e.getMessage());
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
}
}
class SendingTask implements Callable<Long> {
@Override
public Long call() throws Exception {
if (traces.isEmpty()) { if (traces.isEmpty()) {
return 0L; return;
} }
final List<List<DDSpan>> payload = traces.getAll(); final List<List<DDSpan>> payload = traces.getAll();
@ -163,16 +127,17 @@ public class DDAgentWriter implements Writer {
for (final List<?> trace : payload) { for (final List<?> trace : payload) {
nbSpans += trace.size(); nbSpans += trace.size();
} }
log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans);
} }
final boolean isSent = api.sendTraces(payload);
if (!isSent) { final boolean isSent = api.sendTraces(payload);
log.debug("Failing to send {} traces to the API", payload.size()); if (isSent) {
return 0L; log.debug("Successfully sent {} traces to the API", payload.size());
} else {
log.debug("Failed to send {} traces to the API", payload.size());
} }
return (long) payload.size(); } catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
} }
} }
} }

View File

@ -31,6 +31,7 @@ public class DDApi {
private static final String DATADOG_META_TRACER_VERSION = "Datadog-Meta-Tracer-Version"; private static final String DATADOG_META_TRACER_VERSION = "Datadog-Meta-Tracer-Version";
private static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count"; private static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count";
private static final int HTTP_TIMEOUT = 1; // 1 second for conenct/read/write operations
private static final String TRACES_ENDPOINT_V3 = "v0.3/traces"; private static final String TRACES_ENDPOINT_V3 = "v0.3/traces";
private static final String TRACES_ENDPOINT_V4 = "v0.4/traces"; private static final String TRACES_ENDPOINT_V4 = "v0.4/traces";
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5); private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);
@ -59,7 +60,7 @@ public class DDApi {
final int port, final int port,
final boolean v4EndpointsAvailable, final boolean v4EndpointsAvailable,
final String unixDomainSocketPath) { final String unixDomainSocketPath) {
httpClient = buildHttpClient(unixDomainSocketPath, false); httpClient = buildHttpClient(unixDomainSocketPath);
if (v4EndpointsAvailable) { if (v4EndpointsAvailable) {
tracesUrl = getUrl(host, port, TRACES_ENDPOINT_V4); tracesUrl = getUrl(host, port, TRACES_ENDPOINT_V4);
@ -166,8 +167,7 @@ public class DDApi {
final Object data, final Object data,
final boolean retry) { final boolean retry) {
try { try {
// This is potentially called in premain, so we want to fail fast. final OkHttpClient client = buildHttpClient(unixDomainSocketPath);
final OkHttpClient client = buildHttpClient(unixDomainSocketPath, true);
final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(data)); final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(data));
final Request request = prepareRequest(url).put(body).build(); final Request request = prepareRequest(url).put(body).build();
@ -182,20 +182,16 @@ public class DDApi {
return false; return false;
} }
private static OkHttpClient buildHttpClient( private static OkHttpClient buildHttpClient(final String unixDomainSocketPath) {
final String unixDomainSocketPath, final boolean setTimeouts) {
OkHttpClient.Builder builder = new OkHttpClient.Builder(); OkHttpClient.Builder builder = new OkHttpClient.Builder();
if (unixDomainSocketPath != null) { if (unixDomainSocketPath != null) {
builder = builder.socketFactory(new UnixDomainSocketFactory(new File(unixDomainSocketPath))); builder = builder.socketFactory(new UnixDomainSocketFactory(new File(unixDomainSocketPath)));
} }
if (setTimeouts) { return builder
builder = .connectTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS)
builder .writeTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS)
.connectTimeout(1, TimeUnit.SECONDS) .readTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS)
.writeTimeout(1, TimeUnit.SECONDS) .build();
.readTimeout(1, TimeUnit.SECONDS);
}
return builder.build();
} }
private static HttpUrl getUrl(final String host, final int port, final String endPoint) { private static HttpUrl getUrl(final String host, final int port, final String endPoint) {