diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index f94afb2d4b..a8c6e070ec 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -6,14 +6,10 @@ import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT; import datadog.opentracing.DDSpan; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import lombok.extern.slf4j.Slf4j; /** @@ -32,9 +28,6 @@ public class DDAgentWriter implements Writer { /** Maximum number of traces kept in memory */ static final int DEFAULT_MAX_TRACES = 7000; - /** Timeout for the API in seconds */ - static final long API_TIMEOUT_SECONDS = 1; - /** Flush interval for the API in seconds */ static final long FLUSH_TIME_SECONDS = 1; @@ -52,10 +45,6 @@ public class DDAgentWriter implements Writer { private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1, agentWriterThreadFactory); - /** Effective thread pool, where real logic is done */ - private final ExecutorService executor = - Executors.newSingleThreadExecutor(agentWriterThreadFactory); - /** The DD agent api */ private final DDApi api; @@ -107,18 +96,11 @@ public class DDAgentWriter implements Writer { @Override public void close() { scheduledExecutor.shutdownNow(); - executor.shutdownNow(); try { scheduledExecutor.awaitTermination(500, TimeUnit.MILLISECONDS); } catch (final InterruptedException e) { log.info("Writer properly closed and async writer interrupted."); } - - try { - executor.awaitTermination(500, TimeUnit.MILLISECONDS); - } catch (final InterruptedException e) { - log.info("Writer properly closed and async writer interrupted."); - } } @Override @@ -130,30 +112,12 @@ public class DDAgentWriter implements Writer { return api; } - /** Infinite tasks blocking until some spans come in the blocking queue. */ class TracesSendingTask implements Runnable { - @Override public void run() { - final Future future = executor.submit(new SendingTask()); try { - final long nbTraces = future.get(API_TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (nbTraces > 0) { - log.debug("Successfully sent {} traces to the API", nbTraces); - } - } catch (final TimeoutException e) { - log.debug("Timeout! Failed to send traces to the API: {}", e.getMessage()); - } catch (final Throwable e) { - log.debug("Failed to send traces to the API: {}", e.getMessage()); - } - } - - class SendingTask implements Callable { - - @Override - public Long call() throws Exception { if (traces.isEmpty()) { - return 0L; + return; } final List> payload = traces.getAll(); @@ -163,16 +127,17 @@ public class DDAgentWriter implements Writer { for (final List trace : payload) { nbSpans += trace.size(); } - log.debug("Sending {} traces ({} spans) to the API (async)", payload.size(), nbSpans); } - final boolean isSent = api.sendTraces(payload); - if (!isSent) { - log.debug("Failing to send {} traces to the API", payload.size()); - return 0L; + final boolean isSent = api.sendTraces(payload); + if (isSent) { + log.debug("Successfully sent {} traces to the API", payload.size()); + } else { + log.debug("Failed to send {} traces to the API", payload.size()); } - return (long) payload.size(); + } catch (final Throwable e) { + log.debug("Failed to send traces to the API: {}", e.getMessage()); } } } diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java index f4a98dab0b..8d2bdbb509 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java @@ -31,6 +31,7 @@ public class DDApi { private static final String DATADOG_META_TRACER_VERSION = "Datadog-Meta-Tracer-Version"; private static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count"; + private static final int HTTP_TIMEOUT = 1; // 1 second for conenct/read/write operations private static final String TRACES_ENDPOINT_V3 = "v0.3/traces"; private static final String TRACES_ENDPOINT_V4 = "v0.4/traces"; private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5); @@ -59,7 +60,7 @@ public class DDApi { final int port, final boolean v4EndpointsAvailable, final String unixDomainSocketPath) { - httpClient = buildHttpClient(unixDomainSocketPath, false); + httpClient = buildHttpClient(unixDomainSocketPath); if (v4EndpointsAvailable) { tracesUrl = getUrl(host, port, TRACES_ENDPOINT_V4); @@ -166,8 +167,7 @@ public class DDApi { final Object data, final boolean retry) { try { - // This is potentially called in premain, so we want to fail fast. - final OkHttpClient client = buildHttpClient(unixDomainSocketPath, true); + final OkHttpClient client = buildHttpClient(unixDomainSocketPath); final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(data)); final Request request = prepareRequest(url).put(body).build(); @@ -182,20 +182,16 @@ public class DDApi { return false; } - private static OkHttpClient buildHttpClient( - final String unixDomainSocketPath, final boolean setTimeouts) { + private static OkHttpClient buildHttpClient(final String unixDomainSocketPath) { OkHttpClient.Builder builder = new OkHttpClient.Builder(); if (unixDomainSocketPath != null) { builder = builder.socketFactory(new UnixDomainSocketFactory(new File(unixDomainSocketPath))); } - if (setTimeouts) { - builder = - builder - .connectTimeout(1, TimeUnit.SECONDS) - .writeTimeout(1, TimeUnit.SECONDS) - .readTimeout(1, TimeUnit.SECONDS); - } - return builder.build(); + return builder + .connectTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS) + .writeTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS) + .readTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS) + .build(); } private static HttpUrl getUrl(final String host, final int port, final String endPoint) {