diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java index 558915a1f8..815cb92c9b 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDApi.java @@ -1,14 +1,19 @@ package datadog.trace.common.writer; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.RateLimiter; import datadog.opentracing.DDSpan; import datadog.opentracing.DDTraceOTInfo; import datadog.trace.common.Service; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.URL; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -19,12 +24,15 @@ import org.msgpack.jackson.dataformat.MessagePackFactory; @Slf4j public class DDApi { - private static final String TRACES_ENDPOINT = "/v0.3/traces"; - private static final String SERVICES_ENDPOINT = "/v0.3/services"; + private static final String TRACES_ENDPOINT_V3 = "/v0.3/traces"; + private static final String SERVICES_ENDPOINT_V3 = "/v0.3/services"; + private static final String TRACES_ENDPOINT_V4 = "/v0.4/traces"; + private static final String SERVICES_ENDPOINT_V4 = "/v0.4/services"; private static final long SECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toSeconds(5); private final String tracesEndpoint; private final String servicesEndpoint; + private final List responseListeners = new ArrayList(); private final RateLimiter loggingRateLimiter = RateLimiter.create(1.0 / SECONDS_BETWEEN_ERROR_LOG); @@ -32,8 +40,21 @@ public class DDApi { private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()); public DDApi(final String host, final int port) { - this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT; - this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT; + if (endpointAvailable("http://" + host + ":" + port + TRACES_ENDPOINT_V4) + && endpointAvailable("http://" + host + ":" + port + SERVICES_ENDPOINT_V4)) { + this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V4; + this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT_V4; + } else { + log.debug("API v0.4 endpoints not available. Downgrading to v0.3"); + this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V3; + this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT_V3; + } + } + + public void addResponseListener(ResponseListener listener) { + if (!responseListeners.contains(listener)) { + responseListeners.add(listener); + } } /** @@ -74,6 +95,21 @@ public class DDApi { out.flush(); out.close(); + String responseString = null; + { + final BufferedReader responseReader = + new BufferedReader(new InputStreamReader(httpCon.getInputStream())); + final StringBuilder sb = new StringBuilder(); + + String line = null; + while ((line = responseReader.readLine()) != null) { + sb.append(line); + } + responseReader.close(); + + responseString = sb.toString(); + } + final int responseCode = httpCon.getResponseCode(); if (responseCode != 200) { if (log.isDebugEnabled()) { @@ -96,6 +132,19 @@ public class DDApi { } log.debug("Succesfully sent {} {} to the DD agent.", size, type); + + try { + if (null != responseString + && !"".equals(responseString.trim()) + && !"OK".equalsIgnoreCase(responseString.trim())) { + JsonNode response = objectMapper.readTree(responseString); + for (ResponseListener listener : responseListeners) { + listener.onResponse(endpoint, response); + } + } + } catch (IOException e) { + log.debug("failed to parse DD agent response: " + responseString, e); + } return true; } catch (final IOException e) { @@ -114,11 +163,24 @@ public class DDApi { } } + private boolean endpointAvailable(final String endpoint) { + try { + final HttpURLConnection httpCon = getHttpURLConnection(endpoint); + OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream()); + out.flush(); + out.close(); + return httpCon.getResponseCode() == 200; + } catch (IOException e) { + } + return false; + } + private HttpURLConnection getHttpURLConnection(final String endpoint) throws IOException { final HttpURLConnection httpCon; final URL url = new URL(endpoint); httpCon = (HttpURLConnection) url.openConnection(); httpCon.setDoOutput(true); + httpCon.setDoInput(true); httpCon.setRequestMethod("PUT"); httpCon.setRequestProperty("Content-Type", "application/msgpack"); httpCon.setRequestProperty("Datadog-Meta-Lang", "java"); @@ -132,4 +194,9 @@ public class DDApi { public String toString() { return "DDApi { tracesEndpoint=" + tracesEndpoint + " }"; } + + public static interface ResponseListener { + /** Invoked after the api receives a response from the core agent. */ + void onResponse(String endpoint, JsonNode responseJson); + } } diff --git a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy index 2987248721..9029f62dc5 100644 --- a/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/trace/api/writer/DDApiTest.groovy @@ -1,10 +1,12 @@ package datadog.trace.api.writer import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.ObjectMapper import datadog.trace.SpanFactory import datadog.trace.common.Service import datadog.trace.common.writer.DDApi +import datadog.trace.common.writer.DDApi.ResponseListener import org.msgpack.jackson.dataformat.MessagePackFactory import ratpack.http.Headers import ratpack.http.MediaType @@ -21,7 +23,10 @@ class DDApiTest extends Specification { setup: def agent = ratpack { handlers { - put("v0.3/traces") { + put("v0.4/traces") { + response.status(200).send() + } + put("v0.4/services") { response.status(200).send() } } @@ -39,9 +44,12 @@ class DDApiTest extends Specification { setup: def agent = ratpack { handlers { - put("v0.3/traces") { + put("v0.4/traces") { response.status(404).send() } + put("v0.4/services") { + response.status(200).send() + } } } def client = new DDApi("localhost", agent.address.port) @@ -60,7 +68,7 @@ class DDApiTest extends Specification { def requestBody = new AtomicReference() def agent = ratpack { handlers { - put("v0.3/traces") { + put("v0.4/traces") { requestContentType.set(request.contentType) requestHeaders.set(request.headers) request.body.then { @@ -68,6 +76,9 @@ class DDApiTest extends Specification { response.send() } } + put("v0.4/services") { + response.status(200).send() + } } } def client = new DDApi("localhost", agent.address.port) @@ -120,7 +131,10 @@ class DDApiTest extends Specification { setup: def agent = ratpack { handlers { - put("v0.3/services") { + put("v0.4/traces") { + response.status(200).send() + } + put("v0.4/services") { response.status(200).send() } } @@ -138,7 +152,10 @@ class DDApiTest extends Specification { setup: def agent = ratpack { handlers { - put("v0.3/services") { + put("v0.4/traces") { + response.status(200).send() + } + put("v0.4/services") { response.status(404).send() } } @@ -159,7 +176,10 @@ class DDApiTest extends Specification { def requestBody = new AtomicReference() def agent = ratpack { handlers { - put("v0.3/services") { + put("v0.4/traces") { + response.status(200).send() + } + put("v0.4/services") { requestContentType.set(request.contentType) requestHeaders.set(request.headers) request.body.then { @@ -192,6 +212,74 @@ class DDApiTest extends Specification { ] } + def "Api ResponseListeners see 200 responses"() { + setup: + def agentResponse = new AtomicReference(null) + ResponseListener responseListener = new ResponseListener() { + @Override + void onResponse(String endpoint, JsonNode responseJson) { + agentResponse.set(responseJson.toString()) + } + } + boolean servicesAvailable = true + def agent = ratpack { + handlers { + put("v0.4/traces") { + response.status(200).send('{"hello":"test"}') + } + put("v0.4/services") { + if (servicesAvailable) { + response.status(200).send('{"service-response":"from-test"}') + } else { + response.status(404).send('{"service-response":"from-test"}') + } + } + } + } + def client = new DDApi("localhost", agent.address.port) + client.addResponseListener(responseListener) + def services = ["my-service-name": new Service("my-service-name", "my-app-name", Service.AppType.CUSTOM)] + + when: + client.sendTraces([]) + then: + agentResponse.get() == '{"hello":"test"}' + + when: + servicesAvailable = false + agentResponse.set('not-set') + client.sendServices(services) + then: + // response not seen because of non-200 status + agentResponse.get() == 'not-set' + + + cleanup: + agent.close() + } + + def "Api Downgrades to v3"() { + setup: + def v3Agent = ratpack { + handlers { + put("v0.3/traces") { + response.status(200).send() + } + put("v0.3/services") { + response.status(200).send() + } + } + } + def client = new DDApi("localhost", v3Agent.address.port) + + expect: + client.sendTraces([]) + client.sendServices() + + cleanup: + v3Agent.close() + } + static List> convertList(byte[] bytes) { return mapper.readValue(bytes, new TypeReference>>() {}) }