Support v4 api with fallback to v3

This commit is contained in:
Andrew Kent 2018-01-18 11:22:58 -05:00 committed by Ark
parent 7b03f4e91f
commit 1fc7950ee9
2 changed files with 165 additions and 10 deletions

View File

@ -1,14 +1,19 @@
package datadog.trace.common.writer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.Service;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -19,12 +24,15 @@ import org.msgpack.jackson.dataformat.MessagePackFactory;
@Slf4j
public class DDApi {
private static final String TRACES_ENDPOINT = "/v0.3/traces";
private static final String SERVICES_ENDPOINT = "/v0.3/services";
private static final String TRACES_ENDPOINT_V3 = "/v0.3/traces";
private static final String SERVICES_ENDPOINT_V3 = "/v0.3/services";
private static final String TRACES_ENDPOINT_V4 = "/v0.4/traces";
private static final String SERVICES_ENDPOINT_V4 = "/v0.4/services";
private static final long SECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toSeconds(5);
private final String tracesEndpoint;
private final String servicesEndpoint;
private final List<ResponseListener> responseListeners = new ArrayList<ResponseListener>();
private final RateLimiter loggingRateLimiter =
RateLimiter.create(1.0 / SECONDS_BETWEEN_ERROR_LOG);
@ -32,8 +40,21 @@ public class DDApi {
private final ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory());
public DDApi(final String host, final int port) {
this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT;
this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT;
if (endpointAvailable("http://" + host + ":" + port + TRACES_ENDPOINT_V4)
&& endpointAvailable("http://" + host + ":" + port + SERVICES_ENDPOINT_V4)) {
this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V4;
this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT_V4;
} else {
log.debug("API v0.4 endpoints not available. Downgrading to v0.3");
this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT_V3;
this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT_V3;
}
}
public void addResponseListener(ResponseListener listener) {
if (!responseListeners.contains(listener)) {
responseListeners.add(listener);
}
}
/**
@ -74,6 +95,21 @@ public class DDApi {
out.flush();
out.close();
String responseString = null;
{
final BufferedReader responseReader =
new BufferedReader(new InputStreamReader(httpCon.getInputStream()));
final StringBuilder sb = new StringBuilder();
String line = null;
while ((line = responseReader.readLine()) != null) {
sb.append(line);
}
responseReader.close();
responseString = sb.toString();
}
final int responseCode = httpCon.getResponseCode();
if (responseCode != 200) {
if (log.isDebugEnabled()) {
@ -96,6 +132,19 @@ public class DDApi {
}
log.debug("Succesfully sent {} {} to the DD agent.", size, type);
try {
if (null != responseString
&& !"".equals(responseString.trim())
&& !"OK".equalsIgnoreCase(responseString.trim())) {
JsonNode response = objectMapper.readTree(responseString);
for (ResponseListener listener : responseListeners) {
listener.onResponse(endpoint, response);
}
}
} catch (IOException e) {
log.debug("failed to parse DD agent response: " + responseString, e);
}
return true;
} catch (final IOException e) {
@ -114,11 +163,24 @@ public class DDApi {
}
}
private boolean endpointAvailable(final String endpoint) {
try {
final HttpURLConnection httpCon = getHttpURLConnection(endpoint);
OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream());
out.flush();
out.close();
return httpCon.getResponseCode() == 200;
} catch (IOException e) {
}
return false;
}
private HttpURLConnection getHttpURLConnection(final String endpoint) throws IOException {
final HttpURLConnection httpCon;
final URL url = new URL(endpoint);
httpCon = (HttpURLConnection) url.openConnection();
httpCon.setDoOutput(true);
httpCon.setDoInput(true);
httpCon.setRequestMethod("PUT");
httpCon.setRequestProperty("Content-Type", "application/msgpack");
httpCon.setRequestProperty("Datadog-Meta-Lang", "java");
@ -132,4 +194,9 @@ public class DDApi {
public String toString() {
return "DDApi { tracesEndpoint=" + tracesEndpoint + " }";
}
public static interface ResponseListener {
/** Invoked after the api receives a response from the core agent. */
void onResponse(String endpoint, JsonNode responseJson);
}
}

View File

@ -1,10 +1,12 @@
package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.trace.SpanFactory
import datadog.trace.common.Service
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.DDApi.ResponseListener
import org.msgpack.jackson.dataformat.MessagePackFactory
import ratpack.http.Headers
import ratpack.http.MediaType
@ -21,7 +23,10 @@ class DDApiTest extends Specification {
setup:
def agent = ratpack {
handlers {
put("v0.3/traces") {
put("v0.4/traces") {
response.status(200).send()
}
put("v0.4/services") {
response.status(200).send()
}
}
@ -39,9 +44,12 @@ class DDApiTest extends Specification {
setup:
def agent = ratpack {
handlers {
put("v0.3/traces") {
put("v0.4/traces") {
response.status(404).send()
}
put("v0.4/services") {
response.status(200).send()
}
}
}
def client = new DDApi("localhost", agent.address.port)
@ -60,7 +68,7 @@ class DDApiTest extends Specification {
def requestBody = new AtomicReference<byte[]>()
def agent = ratpack {
handlers {
put("v0.3/traces") {
put("v0.4/traces") {
requestContentType.set(request.contentType)
requestHeaders.set(request.headers)
request.body.then {
@ -68,6 +76,9 @@ class DDApiTest extends Specification {
response.send()
}
}
put("v0.4/services") {
response.status(200).send()
}
}
}
def client = new DDApi("localhost", agent.address.port)
@ -120,7 +131,10 @@ class DDApiTest extends Specification {
setup:
def agent = ratpack {
handlers {
put("v0.3/services") {
put("v0.4/traces") {
response.status(200).send()
}
put("v0.4/services") {
response.status(200).send()
}
}
@ -138,7 +152,10 @@ class DDApiTest extends Specification {
setup:
def agent = ratpack {
handlers {
put("v0.3/services") {
put("v0.4/traces") {
response.status(200).send()
}
put("v0.4/services") {
response.status(404).send()
}
}
@ -159,7 +176,10 @@ class DDApiTest extends Specification {
def requestBody = new AtomicReference<byte[]>()
def agent = ratpack {
handlers {
put("v0.3/services") {
put("v0.4/traces") {
response.status(200).send()
}
put("v0.4/services") {
requestContentType.set(request.contentType)
requestHeaders.set(request.headers)
request.body.then {
@ -192,6 +212,74 @@ class DDApiTest extends Specification {
]
}
def "Api ResponseListeners see 200 responses"() {
setup:
def agentResponse = new AtomicReference<String>(null)
ResponseListener responseListener = new ResponseListener() {
@Override
void onResponse(String endpoint, JsonNode responseJson) {
agentResponse.set(responseJson.toString())
}
}
boolean servicesAvailable = true
def agent = ratpack {
handlers {
put("v0.4/traces") {
response.status(200).send('{"hello":"test"}')
}
put("v0.4/services") {
if (servicesAvailable) {
response.status(200).send('{"service-response":"from-test"}')
} else {
response.status(404).send('{"service-response":"from-test"}')
}
}
}
}
def client = new DDApi("localhost", agent.address.port)
client.addResponseListener(responseListener)
def services = ["my-service-name": new Service("my-service-name", "my-app-name", Service.AppType.CUSTOM)]
when:
client.sendTraces([])
then:
agentResponse.get() == '{"hello":"test"}'
when:
servicesAvailable = false
agentResponse.set('not-set')
client.sendServices(services)
then:
// response not seen because of non-200 status
agentResponse.get() == 'not-set'
cleanup:
agent.close()
}
def "Api Downgrades to v3"() {
setup:
def v3Agent = ratpack {
handlers {
put("v0.3/traces") {
response.status(200).send()
}
put("v0.3/services") {
response.status(200).send()
}
}
}
def client = new DDApi("localhost", v3Agent.address.port)
expect:
client.sendTraces([])
client.sendServices()
cleanup:
v3Agent.close()
}
static List<TreeMap<String, Object>> convertList(byte[] bytes) {
return mapper.readValue(bytes, new TypeReference<List<TreeMap<String, Object>>>() {})
}