From 4072edb405aa378f3378dad0a7efa8638e403dcd Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Mon, 7 Aug 2017 12:04:26 +0200 Subject: [PATCH 1/9] Ability to add extra service information (WIP) --- .../java/com/datadoghq/trace/DDTracer.java | 36 ++++++++++- .../java/com/datadoghq/trace/Service.java | 47 ++++++++++++++ .../datadoghq/trace/writer/DDAgentWriter.java | 4 ++ .../datadoghq/trace/writer/ListWriter.java | 7 +++ .../datadoghq/trace/writer/LoggingWriter.java | 6 ++ .../com/datadoghq/trace/writer/Writer.java | 8 +++ .../com/datadoghq/trace/ServiceTest.groovy | 63 +++++++++++++++++++ 7 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 dd-trace/src/main/java/com/datadoghq/trace/Service.java create mode 100644 dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy diff --git a/dd-trace/src/main/java/com/datadoghq/trace/DDTracer.java b/dd-trace/src/main/java/com/datadoghq/trace/DDTracer.java index 75d1d5820b..c3e1a34a28 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/DDTracer.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/DDTracer.java @@ -7,13 +7,19 @@ import com.datadoghq.trace.sampling.AllSampler; import com.datadoghq.trace.sampling.Sampler; import com.datadoghq.trace.writer.LoggingWriter; import com.datadoghq.trace.writer.Writer; +import com.fasterxml.jackson.annotation.JsonIgnore; import io.opentracing.ActiveSpan; import io.opentracing.ActiveSpanSource; import io.opentracing.BaseSpan; import io.opentracing.SpanContext; import io.opentracing.propagation.Format; import io.opentracing.util.ThreadLocalActiveSpanSource; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; import java.util.concurrent.ThreadLocalRandom; import lombok.extern.slf4j.Slf4j; @@ -37,6 +43,7 @@ public class DDTracer extends ThreadLocalActiveSpanSource implements io.opentrac private final Map> spanContextDecorators = new HashMap<>(); private final CodecRegistry registry; + private final List services = new ArrayList<>(); /** Default constructor, trace/spans are logged, no trace/span dropped */ public DDTracer() { @@ -137,6 +144,33 @@ public class DDTracer extends ThreadLocalActiveSpanSource implements io.opentrac return "DDTracer{" + "writer=" + writer + ", sampler=" + sampler + '}'; } + /** + * Register additional information about a service. Service additional information are a Datadog + * feature only. Services are reported through a specific Datadog endpoint. + * + * @param service additional service information + */ + public void addServiceInfo(final Service service) { + services.add(service); + // Update the write + try { + // We don't bother to send multiple times the list of services at this time + writer.writeServices(services); + } catch (final Throwable ex) { + log.warn("Failed to report additional service information, reason: {}", ex.getMessage()); + } + } + + /** + * Return the list of additional service information registered + * + * @return the list of additional service information + */ + @JsonIgnore + public List getServiceInfo() { + return services; + } + private static class CodecRegistry { private final Map, Codec> codecs = new HashMap<>(); diff --git a/dd-trace/src/main/java/com/datadoghq/trace/Service.java b/dd-trace/src/main/java/com/datadoghq/trace/Service.java new file mode 100644 index 0000000000..1da519279f --- /dev/null +++ b/dd-trace/src/main/java/com/datadoghq/trace/Service.java @@ -0,0 +1,47 @@ +package com.datadoghq.trace; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Service { + + private final String name; + private final String appName; + private final Service.AppType appType; + + public Service(final String name, final String appName, final AppType appType) { + this.name = name; + this.appName = appName; + this.appType = appType; + } + + @JsonProperty("service") + public String getName() { + return name; + } + + @JsonProperty("app") + public String getAppName() { + return appName; + } + + @JsonProperty("app_type") + public AppType getAppType() { + return appType; + } + + public enum AppType { + WEB("web"), + DB("db"), + CUSTOM("custom"); + + private final String type; + + AppType(final String type) { + this.type = type; + } + + public String toString() { + return type; + } + } +} diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 3b27f6576c..ee506073ae 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -1,6 +1,7 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; +import com.datadoghq.trace.Service; import com.google.auto.service.AutoService; import java.util.List; import java.util.concurrent.Callable; @@ -80,6 +81,9 @@ public class DDAgentWriter implements Writer { queueFullReported = false; } + @Override + public void writeServices(final List services) {} + /* (non-Javadoc) * @see com.datadoghq.trace.writer.Writer#start() */ diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java index 52debaa277..e6d2b27356 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java @@ -1,8 +1,10 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; +import com.datadoghq.trace.Service; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** List writer used by tests mostly */ public class ListWriter extends CopyOnWriteArrayList>> implements Writer { @@ -20,6 +22,11 @@ public class ListWriter extends CopyOnWriteArrayList>> implem add(trace); } + @Override + public void writeServices(final List services) { + throw new NotImplementedException(); + } + @Override public void start() { clear(); diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/LoggingWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/LoggingWriter.java index 7579fc6e4f..e4a00c878e 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/LoggingWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/LoggingWriter.java @@ -1,6 +1,7 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; +import com.datadoghq.trace.Service; import com.google.auto.service.AutoService; import java.util.List; import lombok.extern.slf4j.Slf4j; @@ -14,6 +15,11 @@ public class LoggingWriter implements Writer { log.info("write(trace): {}", trace); } + @Override + public void writeServices(final List services) { + log.info("additional service information: {}", services); + } + @Override public void close() { log.info("close()"); diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/Writer.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/Writer.java index 3390b8bc91..c8244e530e 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/Writer.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/Writer.java @@ -1,6 +1,7 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; +import com.datadoghq.trace.Service; import java.util.List; /** A writer is responsible to send collected spans to some place */ @@ -13,6 +14,13 @@ public interface Writer { */ void write(List> trace); + /** + * Report additional service information to the endpoint + * + * @param services a list of extra information about services + */ + void writeServices(List services); + /** Start the writer */ void start(); diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy new file mode 100644 index 0000000000..e00f893de8 --- /dev/null +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy @@ -0,0 +1,63 @@ +package com.datadoghq.trace + +import com.datadoghq.trace.sampling.AllSampler +import com.datadoghq.trace.writer.DDAgentWriter +import org.mockito.Mockito +import spock.lang.Specification + +class ServiceTest extends Specification { + + + def "getter/setter"() { + + setup: + def service = new Service("service-name", "app-name", Service.AppType.CUSTOM) + + expect: + service.getName() == "service-name" + service.getAppName() == "app-name" + service.getAppType() == Service.AppType.CUSTOM + + } + + def "enum"() { + + expect: + Service.AppType.values().size() == 3 + Service.AppType.DB.toString() == "db" + Service.AppType.WEB.toString() == "web" + Service.AppType.CUSTOM.toString() == "custom" + + } + + def "add extra info about a specific service"() { + + setup: + def tracer = new DDTracer() + def service = new Service("service-name", "app-name", Service.AppType.CUSTOM) + + when: + tracer.addServiceInfo(service) + + then: + tracer.getServiceInfo().size() == 1 + tracer.getServiceInfo().get(0) == service + + } + + def "add a extra info is reported to the writer"() { + + setup: + def writer = Mockito.spy(new DDAgentWriter()) + def tracer = new DDTracer(writer, new AllSampler()) + + + when: + tracer.addServiceInfo(new Service("service-name", "app-name", Service.AppType.CUSTOM)) + + then: + Mockito.verify(writer, Mockito.times(1)).writeServices(Mockito.any(List.class)) + + } + +} From 03e1ea9014b8f6a178c46b76e096eaa12dbb59a9 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Mon, 7 Aug 2017 14:25:01 +0200 Subject: [PATCH 2/9] Updating example --- dd-trace/src/test/java/ExampleWithLoggingWriter.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dd-trace/src/test/java/ExampleWithLoggingWriter.java b/dd-trace/src/test/java/ExampleWithLoggingWriter.java index 98913bc147..90d3937638 100644 --- a/dd-trace/src/test/java/ExampleWithLoggingWriter.java +++ b/dd-trace/src/test/java/ExampleWithLoggingWriter.java @@ -1,15 +1,17 @@ import com.datadoghq.trace.DDTracer; +import com.datadoghq.trace.Service; import com.datadoghq.trace.sampling.AllSampler; import com.datadoghq.trace.writer.LoggingWriter; import io.opentracing.Span; public class ExampleWithLoggingWriter { - public static void main(String[] args) throws Exception { + public static void main(final String[] args) throws Exception { - DDTracer tracer = new DDTracer(new LoggingWriter(), new AllSampler()); + final DDTracer tracer = new DDTracer(new LoggingWriter(), new AllSampler()); + tracer.addServiceInfo(new Service("service-foo", "mongo", Service.AppType.DB)); - Span parent = + final Span parent = tracer .buildSpan("hello-world") .withServiceName("service-name") @@ -20,7 +22,7 @@ public class ExampleWithLoggingWriter { Thread.sleep(100); - Span child = + final Span child = tracer .buildSpan("hello-world") .asChildOf(parent) From e874cb2e0d3f516c23c49700f69453a6a2ab9398 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Mon, 7 Aug 2017 15:34:23 +0200 Subject: [PATCH 3/9] Implement service endpoint support --- .../java/com/datadoghq/trace/DDTracer.java | 8 +- .../java/com/datadoghq/trace/Service.java | 19 +++- .../datadoghq/trace/writer/DDAgentWriter.java | 54 +++++++++++- .../com/datadoghq/trace/writer/DDApi.java | 35 ++++++-- .../datadoghq/trace/writer/ListWriter.java | 6 +- .../datadoghq/trace/writer/LoggingWriter.java | 5 +- .../com/datadoghq/trace/writer/Writer.java | 3 +- .../com/datadoghq/trace/ServiceTest.groovy | 4 +- .../datadoghq/trace/writer/DDApiTest.groovy | 86 ++++++++++++++++++- 9 files changed, 199 insertions(+), 21 deletions(-) diff --git a/dd-trace/src/main/java/com/datadoghq/trace/DDTracer.java b/dd-trace/src/main/java/com/datadoghq/trace/DDTracer.java index c3e1a34a28..516ed5abec 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/DDTracer.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/DDTracer.java @@ -43,7 +43,7 @@ public class DDTracer extends ThreadLocalActiveSpanSource implements io.opentrac private final Map> spanContextDecorators = new HashMap<>(); private final CodecRegistry registry; - private final List services = new ArrayList<>(); + private final Map services = new HashMap<>(); /** Default constructor, trace/spans are logged, no trace/span dropped */ public DDTracer() { @@ -151,8 +151,8 @@ public class DDTracer extends ThreadLocalActiveSpanSource implements io.opentrac * @param service additional service information */ public void addServiceInfo(final Service service) { - services.add(service); - // Update the write + services.put(service.getName(), service); + // Update the writer try { // We don't bother to send multiple times the list of services at this time writer.writeServices(services); @@ -167,7 +167,7 @@ public class DDTracer extends ThreadLocalActiveSpanSource implements io.opentrac * @return the list of additional service information */ @JsonIgnore - public List getServiceInfo() { + public Map getServiceInfo() { return services; } diff --git a/dd-trace/src/main/java/com/datadoghq/trace/Service.java b/dd-trace/src/main/java/com/datadoghq/trace/Service.java index 1da519279f..72213c947c 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/Service.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/Service.java @@ -1,6 +1,8 @@ package com.datadoghq.trace; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonValue; public class Service { @@ -14,7 +16,7 @@ public class Service { this.appType = appType; } - @JsonProperty("service") + @JsonIgnore public String getName() { return name; } @@ -29,6 +31,20 @@ public class Service { return appType; } + @Override + public String toString() { + return "Service{" + + "name='" + + name + + '\'' + + ", appName='" + + appName + + '\'' + + ", appType=" + + appType + + '}'; + } + public enum AppType { WEB("web"), DB("db"), @@ -40,6 +56,7 @@ public class Service { this.type = type; } + @JsonValue public String toString() { return type; } diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index ee506073ae..ed4a8a4ec1 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -5,6 +5,9 @@ import com.datadoghq.trace.Service; import com.google.auto.service.AutoService; import java.util.List; import java.util.concurrent.Callable; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -44,6 +47,12 @@ public class DDAgentWriter implements Writer { /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); + /** In memory collection of services waiting for departure */ + private final BlockingQueue> services = + new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); + + /** Async worker that posts the spans to the DD agent */ + private final ExecutorService executor = Executors.newFixedThreadPool(2); /** The DD agent api */ private final DDApi api; @@ -81,14 +90,26 @@ public class DDAgentWriter implements Writer { queueFullReported = false; } + /* (non-Javadoc) + * @see com.datadoghq.trace.Writer#writeServices(java.util.List) + */ @Override - public void writeServices(final List services) {} + public void writeServices(final Map services) { + + if (!this.services.offer(services)) { + log.warn( + "Cannot add a service list to the async queue, queue is full. Queue max size: {}", + MAX_QUEUE_SIZE); + } + } /* (non-Javadoc) * @see com.datadoghq.trace.writer.Writer#start() */ @Override public void start() { + executor.submit(new SpansSendingTask()); + executor.submit(new ServicesSendingTask()); scheduledExecutor.scheduleAtFixedRate( new TracesSendingTask(), 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS); } @@ -157,4 +178,35 @@ public class DDAgentWriter implements Writer { } } } + + /** Infinite tasks blocking until some spans come in the blocking queue. */ + protected class ServicesSendingTask implements Runnable { + + @Override + public void run() { + while (true) { + try { + + //WAIT until a new service comes + final Map payload = DDAgentWriter.this.services.take(); + + //SEND the payload to the agent + log.debug("Async writer about to write {} services", payload.size()); + if (api.sendServices(payload)) { + log.debug("Async writer just sent {} services", payload.size()); + } else { + log.warn("Failed for Async writer to send {} services", payload.size()); + } + + } catch (final InterruptedException e) { + log.info("Async writer (services) interrupted."); + + //The thread was interrupted, we break the LOOP + break; + } catch (final Throwable e) { + log.error("Unexpected error! Some services may have been dropped.", e); + } + } + } + } } diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDApi.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDApi.java index 79676ee834..3db98b6499 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDApi.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDApi.java @@ -2,6 +2,7 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.datadoghq.trace.DDTraceInfo; +import com.datadoghq.trace.Service; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; @@ -10,6 +11,7 @@ import java.io.OutputStreamWriter; import java.net.HttpURLConnection; import java.net.URL; import java.util.List; +import java.util.Map; import lombok.extern.slf4j.Slf4j; /** The API pointing to a DD agent */ @@ -17,14 +19,17 @@ import lombok.extern.slf4j.Slf4j; public class DDApi { private static final String TRACES_ENDPOINT = "/v0.3/traces"; + private static final String SERVICES_ENDPOINT = "/v0.3/services"; private final String tracesEndpoint; + private final String servicesEndpoint; private final ObjectMapper objectMapper = new ObjectMapper(); private final JsonFactory jsonFactory = objectMapper.getFactory(); public DDApi(final String host, final int port) { this.tracesEndpoint = "http://" + host + ":" + port + TRACES_ENDPOINT; + this.servicesEndpoint = "http://" + host + ":" + port + SERVICES_ENDPOINT; } /** @@ -34,7 +39,7 @@ public class DDApi { * @return the staus code returned */ public boolean sendTraces(final List>> traces) { - final int status = callPUT(traces); + final int status = callPUT(tracesEndpoint, traces); if (status == 200) { log.debug("Succesfully sent {} traces to the DD agent.", traces.size()); return true; @@ -44,6 +49,26 @@ public class DDApi { } } + /** + * Send service extra information to the services endpoint + * + * @param services the services to be sent + */ + public boolean sendServices(final Map services) { + if (services == null) { + return true; + } + final int status = callPUT(servicesEndpoint, services); + if (status == 200) { + log.debug("Succesfully sent {} services to the DD agent.", services.size()); + return true; + } else { + log.warn( + "Error while sending {} services to the DD agent. Status: {}", services.size(), status); + return false; + } + } + /** * PUT to an endpoint the provided JSON content * @@ -51,10 +76,10 @@ public class DDApi { * @param content * @return the status code */ - private int callPUT(final Object content) { + private int callPUT(final String endpoint, final Object content) { HttpURLConnection httpCon = null; try { - httpCon = getHttpURLConnection(); + httpCon = getHttpURLConnection(endpoint); } catch (final Exception e) { log.warn("Error thrown before PUT call to the DD agent.", e); return -1; @@ -82,9 +107,9 @@ public class DDApi { } } - private HttpURLConnection getHttpURLConnection() throws IOException { + private HttpURLConnection getHttpURLConnection(final String endpoint) throws IOException { final HttpURLConnection httpCon; - final URL url = new URL(tracesEndpoint); + final URL url = new URL(endpoint); httpCon = (HttpURLConnection) url.openConnection(); httpCon.setDoOutput(true); httpCon.setRequestMethod("PUT"); diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java index e6d2b27356..a4311c522b 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/ListWriter.java @@ -3,8 +3,8 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.datadoghq.trace.Service; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** List writer used by tests mostly */ public class ListWriter extends CopyOnWriteArrayList>> implements Writer { @@ -23,8 +23,8 @@ public class ListWriter extends CopyOnWriteArrayList>> implem } @Override - public void writeServices(final List services) { - throw new NotImplementedException(); + public void writeServices(final Map services) { + throw new UnsupportedOperationException(); } @Override diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/LoggingWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/LoggingWriter.java index e4a00c878e..2cdbf1610e 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/LoggingWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/LoggingWriter.java @@ -4,6 +4,7 @@ import com.datadoghq.trace.DDBaseSpan; import com.datadoghq.trace.Service; import com.google.auto.service.AutoService; import java.util.List; +import java.util.Map; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -16,8 +17,8 @@ public class LoggingWriter implements Writer { } @Override - public void writeServices(final List services) { - log.info("additional service information: {}", services); + public void writeServices(final Map services) { + log.info("additional service information: {}", services.values()); } @Override diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/Writer.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/Writer.java index c8244e530e..3d5b786c44 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/Writer.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/Writer.java @@ -3,6 +3,7 @@ package com.datadoghq.trace.writer; import com.datadoghq.trace.DDBaseSpan; import com.datadoghq.trace.Service; import java.util.List; +import java.util.Map; /** A writer is responsible to send collected spans to some place */ public interface Writer { @@ -19,7 +20,7 @@ public interface Writer { * * @param services a list of extra information about services */ - void writeServices(List services); + void writeServices(Map services); /** Start the writer */ void start(); diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy index e00f893de8..900ab04e97 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy @@ -41,7 +41,7 @@ class ServiceTest extends Specification { then: tracer.getServiceInfo().size() == 1 - tracer.getServiceInfo().get(0) == service + tracer.getServiceInfo().get("service-name") == service } @@ -56,7 +56,7 @@ class ServiceTest extends Specification { tracer.addServiceInfo(new Service("service-name", "app-name", Service.AppType.CUSTOM)) then: - Mockito.verify(writer, Mockito.times(1)).writeServices(Mockito.any(List.class)) + Mockito.verify(writer, Mockito.times(1)).writeServices(Mockito.any(Map.class)) } diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/DDApiTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/DDApiTest.groovy index ce9df2296b..a8d7e4f908 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/DDApiTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/DDApiTest.groovy @@ -1,5 +1,6 @@ package com.datadoghq.trace.writer +import com.datadoghq.trace.Service import com.datadoghq.trace.SpanFactory import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.ObjectMapper @@ -76,7 +77,7 @@ class DDApiTest extends Specification { requestHeaders.get().get("Datadog-Meta-Lang") == "java" requestHeaders.get().get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown") requestHeaders.get().get("Datadog-Meta-Tracer-Version") == "unknown" - convert(requestBody.get()) == expectedRequestBody + convertList(requestBody.get()) == expectedRequestBody cleanup: agent.close() @@ -113,7 +114,88 @@ class DDApiTest extends Specification { ])] } - static List> convert(String json) { + // Services endpoint + def "sending an empty map of services returns no errors"() { + setup: + def agent = ratpack { + handlers { + put("v0.3/services") { + response.status(200).send() + } + } + } + def client = new DDApi("localhost", agent.address.port) + + expect: + client.sendServices() + + cleanup: + agent.close() + } + + def "non-200 response results in false returned for services endpoint"() { + setup: + def agent = ratpack { + handlers { + put("v0.3/services") { + response.status(404).send() + } + } + } + def client = new DDApi("localhost", agent.address.port) + + expect: + !client.sendServices([:]) + + cleanup: + agent.close() + } + + def "services content is sent as JSON"() { + setup: + def requestContentType = new AtomicReference() + def requestHeaders = new AtomicReference() + def requestBody = new AtomicReference() + def agent = ratpack { + handlers { + put("v0.3/services") { + requestContentType.set(request.contentType) + requestHeaders.set(request.headers) + request.body.then { + requestBody.set(it.text) + response.send() + } + } + } + } + def client = new DDApi("localhost", agent.address.port) + + expect: + client.sendServices(services) + requestContentType.get().type == APPLICATION_JSON + requestHeaders.get().get("Datadog-Meta-Lang") == "java" + requestHeaders.get().get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown") + requestHeaders.get().get("Datadog-Meta-Tracer-Version") == "unknown" + convertMap(requestBody.get()) == expectedRequestBody + + cleanup: + agent.close() + + // Populate thread info dynamically as it is different when run via gradle vs idea. + where: + services | expectedRequestBody + [:] | [:] + ["service-name": new Service("service-name", "app-name", Service.AppType.CUSTOM)] | ["service-name": new TreeMap<>([ + "app" : "app-name", + "app_type": "custom"]) + ] + } + + static List> convertList(String json) { return mapper.readValue(json, new TypeReference>>() {}) } + + static TreeMap convertMap(String json) { + return mapper.readValue(json, new TypeReference>() {}) + } } From 37cc52596ac6b37534187e322a73ffce637aa257 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Mon, 7 Aug 2017 15:34:41 +0200 Subject: [PATCH 4/9] Update example --- dd-trace/src/test/java/ExampleWithLoggingWriter.java | 4 ++-- dd-trace/src/test/resources/logback.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dd-trace/src/test/java/ExampleWithLoggingWriter.java b/dd-trace/src/test/java/ExampleWithLoggingWriter.java index 90d3937638..8f39dfd57e 100644 --- a/dd-trace/src/test/java/ExampleWithLoggingWriter.java +++ b/dd-trace/src/test/java/ExampleWithLoggingWriter.java @@ -9,12 +9,12 @@ public class ExampleWithLoggingWriter { public static void main(final String[] args) throws Exception { final DDTracer tracer = new DDTracer(new LoggingWriter(), new AllSampler()); - tracer.addServiceInfo(new Service("service-foo", "mongo", Service.AppType.DB)); + tracer.addServiceInfo(new Service("service-foo", "mongo", Service.AppType.WEB)); final Span parent = tracer .buildSpan("hello-world") - .withServiceName("service-name") + .withServiceName("service-foo") .withSpanType("web") .startManual(); diff --git a/dd-trace/src/test/resources/logback.xml b/dd-trace/src/test/resources/logback.xml index eb9560df0a..37e22ad73a 100644 --- a/dd-trace/src/test/resources/logback.xml +++ b/dd-trace/src/test/resources/logback.xml @@ -9,7 +9,7 @@ - + From a224fd0829f78d217384cec9a5c0f50e0eddfeb1 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Mon, 7 Aug 2017 16:40:36 +0200 Subject: [PATCH 5/9] fix codenarc --- .../test/groovy/com/datadoghq/trace/ServiceTest.groovy | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy index 900ab04e97..0944aa2727 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy @@ -2,13 +2,15 @@ package com.datadoghq.trace import com.datadoghq.trace.sampling.AllSampler import com.datadoghq.trace.writer.DDAgentWriter -import org.mockito.Mockito import spock.lang.Specification +import static org.mockito.ArgumentMatchers.any +import static org.mockito.Mockito.* + class ServiceTest extends Specification { - def "getter/setter"() { + def "getter and setter"() { setup: def service = new Service("service-name", "app-name", Service.AppType.CUSTOM) @@ -48,7 +50,7 @@ class ServiceTest extends Specification { def "add a extra info is reported to the writer"() { setup: - def writer = Mockito.spy(new DDAgentWriter()) + def writer = spy(new DDAgentWriter()) def tracer = new DDTracer(writer, new AllSampler()) @@ -56,7 +58,7 @@ class ServiceTest extends Specification { tracer.addServiceInfo(new Service("service-name", "app-name", Service.AppType.CUSTOM)) then: - Mockito.verify(writer, Mockito.times(1)).writeServices(Mockito.any(Map.class)) + verify(writer, times(1)).writeServices(any(Map)) } From 0d79f0cf08053072338b309d2e5703c2afa7948b Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Tue, 8 Aug 2017 09:19:18 +0200 Subject: [PATCH 6/9] reviews --- .../java/com/datadoghq/trace/Service.java | 4 +- .../datadoghq/trace/writer/DDAgentWriter.java | 55 +++++-------------- .../com/datadoghq/trace/ServiceTest.groovy | 16 +++--- .../test/java/ExampleWithLoggingWriter.java | 15 ++--- 4 files changed, 32 insertions(+), 58 deletions(-) diff --git a/dd-trace/src/main/java/com/datadoghq/trace/Service.java b/dd-trace/src/main/java/com/datadoghq/trace/Service.java index 72213c947c..6682ef68d3 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/Service.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/Service.java @@ -48,7 +48,9 @@ public class Service { public enum AppType { WEB("web"), DB("db"), - CUSTOM("custom"); + CUSTOM("custom"), + CACHE("cache"), + WORKER("worker"); private final String type; diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index ed4a8a4ec1..32c655db8e 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -47,9 +47,6 @@ public class DDAgentWriter implements Writer { /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); - /** In memory collection of services waiting for departure */ - private final BlockingQueue> services = - new ArrayBlockingQueue<>(MAX_QUEUE_SIZE); /** Async worker that posts the spans to the DD agent */ private final ExecutorService executor = Executors.newFixedThreadPool(2); @@ -96,11 +93,20 @@ public class DDAgentWriter implements Writer { @Override public void writeServices(final Map services) { - if (!this.services.offer(services)) { - log.warn( - "Cannot add a service list to the async queue, queue is full. Queue max size: {}", - MAX_QUEUE_SIZE); - } + final Runnable task = + new Runnable() { + @Override + public void run() { + //SEND the payload to the agent + log.debug("Async writer about to write {} services", services.size()); + if (api.sendServices(services)) { + log.debug("Async writer just sent {} services", services.size()); + } else { + log.warn("Failed for Async writer to send {} services", services.size()); + } + } + }; + executor.submit(task); } /* (non-Javadoc) @@ -108,8 +114,6 @@ public class DDAgentWriter implements Writer { */ @Override public void start() { - executor.submit(new SpansSendingTask()); - executor.submit(new ServicesSendingTask()); scheduledExecutor.scheduleAtFixedRate( new TracesSendingTask(), 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS); } @@ -178,35 +182,4 @@ public class DDAgentWriter implements Writer { } } } - - /** Infinite tasks blocking until some spans come in the blocking queue. */ - protected class ServicesSendingTask implements Runnable { - - @Override - public void run() { - while (true) { - try { - - //WAIT until a new service comes - final Map payload = DDAgentWriter.this.services.take(); - - //SEND the payload to the agent - log.debug("Async writer about to write {} services", payload.size()); - if (api.sendServices(payload)) { - log.debug("Async writer just sent {} services", payload.size()); - } else { - log.warn("Failed for Async writer to send {} services", payload.size()); - } - - } catch (final InterruptedException e) { - log.info("Async writer (services) interrupted."); - - //The thread was interrupted, we break the LOOP - break; - } catch (final Throwable e) { - log.error("Unexpected error! Some services may have been dropped.", e); - } - } - } - } } diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy index 0944aa2727..c50c6dad4f 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/ServiceTest.groovy @@ -13,11 +13,11 @@ class ServiceTest extends Specification { def "getter and setter"() { setup: - def service = new Service("service-name", "app-name", Service.AppType.CUSTOM) + def service = new Service("api-intake", "kafka", Service.AppType.CUSTOM) expect: - service.getName() == "service-name" - service.getAppName() == "app-name" + service.getName() == "api-intake" + service.getAppName() == "kafka" service.getAppType() == Service.AppType.CUSTOM } @@ -25,10 +25,12 @@ class ServiceTest extends Specification { def "enum"() { expect: - Service.AppType.values().size() == 3 + Service.AppType.values().size() == 5 Service.AppType.DB.toString() == "db" Service.AppType.WEB.toString() == "web" Service.AppType.CUSTOM.toString() == "custom" + Service.AppType.WORKER.toString() == "worker" + Service.AppType.CACHE.toString() == "cache" } @@ -36,14 +38,14 @@ class ServiceTest extends Specification { setup: def tracer = new DDTracer() - def service = new Service("service-name", "app-name", Service.AppType.CUSTOM) + def service = new Service("api-intake", "kafka", Service.AppType.CUSTOM) when: tracer.addServiceInfo(service) then: tracer.getServiceInfo().size() == 1 - tracer.getServiceInfo().get("service-name") == service + tracer.getServiceInfo().get("api-intake") == service } @@ -55,7 +57,7 @@ class ServiceTest extends Specification { when: - tracer.addServiceInfo(new Service("service-name", "app-name", Service.AppType.CUSTOM)) + tracer.addServiceInfo(new Service("api-intake", "kafka", Service.AppType.CUSTOM)) then: verify(writer, times(1)).writeServices(any(Map)) diff --git a/dd-trace/src/test/java/ExampleWithLoggingWriter.java b/dd-trace/src/test/java/ExampleWithLoggingWriter.java index 8f39dfd57e..aed050d50c 100644 --- a/dd-trace/src/test/java/ExampleWithLoggingWriter.java +++ b/dd-trace/src/test/java/ExampleWithLoggingWriter.java @@ -9,24 +9,20 @@ public class ExampleWithLoggingWriter { public static void main(final String[] args) throws Exception { final DDTracer tracer = new DDTracer(new LoggingWriter(), new AllSampler()); - tracer.addServiceInfo(new Service("service-foo", "mongo", Service.AppType.WEB)); + tracer.addServiceInfo(new Service("api-intake", "spark", Service.AppType.CACHE)); final Span parent = - tracer - .buildSpan("hello-world") - .withServiceName("service-foo") - .withSpanType("web") - .startManual(); + tracer.buildSpan("fetch.backend").withServiceName("api-intake").startManual(); - parent.setBaggageItem("a-baggage", "value"); + parent.setBaggageItem("scope-id", "a-1337"); Thread.sleep(100); final Span child = tracer - .buildSpan("hello-world") + .buildSpan("delete.resource") .asChildOf(parent) - .withResourceName("resource-name") + .withResourceName("delete") .startManual(); Thread.sleep(100); @@ -36,5 +32,6 @@ public class ExampleWithLoggingWriter { Thread.sleep(100); parent.finish(); + tracer.close(); } } From ab5926afe4eceb3414209ddbaae7c1987c6b1fb7 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Tue, 8 Aug 2017 09:22:22 +0200 Subject: [PATCH 7/9] reviews --- dd-trace/src/test/java/ExampleWithLoggingWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dd-trace/src/test/java/ExampleWithLoggingWriter.java b/dd-trace/src/test/java/ExampleWithLoggingWriter.java index aed050d50c..74195df8b9 100644 --- a/dd-trace/src/test/java/ExampleWithLoggingWriter.java +++ b/dd-trace/src/test/java/ExampleWithLoggingWriter.java @@ -12,7 +12,7 @@ public class ExampleWithLoggingWriter { tracer.addServiceInfo(new Service("api-intake", "spark", Service.AppType.CACHE)); final Span parent = - tracer.buildSpan("fetch.backend").withServiceName("api-intake").startManual(); + tracer.buildSpan("fetch.backend").withServiceName("api-intake").startManual(); parent.setBaggageItem("scope-id", "a-1337"); @@ -20,9 +20,9 @@ public class ExampleWithLoggingWriter { final Span child = tracer - .buildSpan("delete.resource") + .buildSpan("delete.resource") .asChildOf(parent) - .withResourceName("delete") + .withResourceName("delete") .startManual(); Thread.sleep(100); From b602c53c8462b35aaf1ca3a581bc9c1b6b9c96a8 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Tue, 8 Aug 2017 09:22:48 +0200 Subject: [PATCH 8/9] reviews --- dd-trace/src/test/java/ExampleWithLoggingWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dd-trace/src/test/java/ExampleWithLoggingWriter.java b/dd-trace/src/test/java/ExampleWithLoggingWriter.java index 74195df8b9..aed050d50c 100644 --- a/dd-trace/src/test/java/ExampleWithLoggingWriter.java +++ b/dd-trace/src/test/java/ExampleWithLoggingWriter.java @@ -12,7 +12,7 @@ public class ExampleWithLoggingWriter { tracer.addServiceInfo(new Service("api-intake", "spark", Service.AppType.CACHE)); final Span parent = - tracer.buildSpan("fetch.backend").withServiceName("api-intake").startManual(); + tracer.buildSpan("fetch.backend").withServiceName("api-intake").startManual(); parent.setBaggageItem("scope-id", "a-1337"); @@ -20,9 +20,9 @@ public class ExampleWithLoggingWriter { final Span child = tracer - .buildSpan("delete.resource") + .buildSpan("delete.resource") .asChildOf(parent) - .withResourceName("delete") + .withResourceName("delete") .startManual(); Thread.sleep(100); From 848f0e824cd8f60ac32e2456c4cacb540b01ea31 Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Thu, 17 Aug 2017 09:39:23 +0200 Subject: [PATCH 9/9] rebasing --- .../java/com/datadoghq/trace/writer/DDAgentWriter.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java index 32c655db8e..5d960a93e4 100644 --- a/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java +++ b/dd-trace/src/main/java/com/datadoghq/trace/writer/DDAgentWriter.java @@ -4,10 +4,8 @@ import com.datadoghq.trace.DDBaseSpan; import com.datadoghq.trace.Service; import com.google.auto.service.AutoService; import java.util.List; -import java.util.concurrent.Callable; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,9 +46,6 @@ public class DDAgentWriter implements Writer { /** Effective thread pool, where real logic is done */ private final ExecutorService executor = Executors.newSingleThreadExecutor(); - /** Async worker that posts the spans to the DD agent */ - private final ExecutorService executor = Executors.newFixedThreadPool(2); - /** The DD agent api */ private final DDApi api;