From 0663d34aeb4a95ec8e1479f409675f178a36bbfe Mon Sep 17 00:00:00 2001 From: renaudboutet Date: Wed, 26 Apr 2017 19:18:10 +0200 Subject: [PATCH 1/2] First version of the writer working with the Agent API --- src/main/java/com/datadoghq/trace/Writer.java | 9 ++- .../trace/writer/impl/DDAgentWriter.java | 62 +++++++++++++------ .../datadoghq/trace/writer/impl/DDApi.java | 2 +- 3 files changed, 51 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/datadoghq/trace/Writer.java b/src/main/java/com/datadoghq/trace/Writer.java index ebf0360422..afde6d4264 100644 --- a/src/main/java/com/datadoghq/trace/Writer.java +++ b/src/main/java/com/datadoghq/trace/Writer.java @@ -1,10 +1,17 @@ package com.datadoghq.trace; +import java.util.List; + import io.opentracing.Span; public interface Writer { - public void write(Span span); + /** + * Write a trace represented by the entire list of all the finished spans + * + * @param trace + */ + public void write(List trace); public void close(); } diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java index 71e1fa453a..4c6cff9b59 100644 --- a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; import com.datadoghq.trace.Writer; @@ -13,32 +14,40 @@ public class DDAgentWriter implements Writer { protected static final String DEFAULT_HOSTNAME = "localhost"; protected static final int DEFAULT_PORT = 8126; - - protected static final int DEFAULT_MAX_TRACES = 1000; + + protected static final int DEFAULT_MAX_SPANS = 1000; protected static final int DEFAULT_BATCH_SIZE = 10; protected static final int DEFAULT_MAX_SERVICES = 1000; protected static final long DEFAULT_TIMEOUT = 5000; - protected final BlockingQueue commandQueue; + private final Semaphore tokens; + protected final BlockingQueue> traces; protected final Thread asyncWriterThread; + protected final DDApi api; + public DDAgentWriter() { super(); - commandQueue = new ArrayBlockingQueue(DEFAULT_MAX_TRACES); - + tokens = new Semaphore(DEFAULT_MAX_SPANS); + traces = new ArrayBlockingQueue>(DEFAULT_MAX_SPANS); + + api = new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT); + asyncWriterThread = new Thread(new SpansSendingTask(), "dd.DDAgentWriter-SpansSendingTask"); asyncWriterThread.setDaemon(true); asyncWriterThread.start(); } - public void write(Span span) { - try{ - //Try to add a new span in the queue - commandQueue.add(span); - }catch(IllegalStateException e){ + public void write(List trace) { + //Try to add a new span in the queue + boolean proceed = tokens.tryAcquire(trace.size()); + + if(proceed){ + traces.add(trace); + }else{ //It was not possible to add the span the queue is full! //FIXME proper logging - System.out.println("Cannot add the following span as the async queue is full: "+span); + System.out.println("Cannot add the following trace as the async queue is full: "+trace); } } @@ -47,24 +56,37 @@ public class DDAgentWriter implements Writer { } protected class SpansSendingTask implements Runnable { + + protected final List> payload = new ArrayList>(); + public void run() { while (true) { try { - //Wait until a new span comes - Span span = commandQueue.take(); - + //WAIT until a new span comes + payload.add(traces.take()); + //Drain all spans up to a certain batch suze - List spans = new ArrayList(); - spans.add(span); - commandQueue.drainTo(spans, DEFAULT_BATCH_SIZE); + traces.drainTo(payload, DEFAULT_BATCH_SIZE); + + //SEND the payload to the agent + api.sendTraces(payload); + + //Compute the number of spans sent + int spansCount = 0; + for(List trace:payload){ + spansCount+=trace.size(); + } - //Then write to the agent - System.out.println(spans); + //Force garbage collect of the payload + payload.clear(); + + //Release the tokens + tokens.release(spansCount); } catch (InterruptedException e) { // TODO Auto-generated catch block // FIXME proper logging e.printStackTrace(); - + //The thread was interrupted, we break the LOOP break; } diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java b/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java index 6fa76db9de..e65d1beb05 100644 --- a/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java @@ -46,7 +46,7 @@ public class DDApi { return true; }else{ - //FIXME log status here + //FIXME log issue here return false; } From 99977a036e43dabce15a9c4cacdbef9cf07f9359 Mon Sep 17 00:00:00 2001 From: renaudboutet Date: Wed, 26 Apr 2017 20:33:19 +0200 Subject: [PATCH 2/2] Small fixes --- .../java/com/datadoghq/trace/impl/DDSpan.java | 281 +++++++++--------- .../java/com/datadoghq/trace/impl/Tracer.java | 4 +- .../trace/writer/impl/DDAgentWriter.java | 11 + .../datadoghq/trace/writer/impl/DDApi.java | 14 +- src/test/java/Example.java | 35 ++- .../com/datadoghq/trace/impl/DDSpanTest.java | 9 +- 6 files changed, 185 insertions(+), 169 deletions(-) diff --git a/src/main/java/com/datadoghq/trace/impl/DDSpan.java b/src/main/java/com/datadoghq/trace/impl/DDSpan.java index 963ba680bb..c314c16ec3 100644 --- a/src/main/java/com/datadoghq/trace/impl/DDSpan.java +++ b/src/main/java/com/datadoghq/trace/impl/DDSpan.java @@ -1,11 +1,11 @@ package com.datadoghq.trace.impl; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonIgnore; import io.opentracing.Span; import io.opentracing.SpanContext; @@ -13,176 +13,181 @@ import io.opentracing.SpanContext; public class DDSpan implements io.opentracing.Span { - protected final Tracer tracer; - protected String operationName; - protected Map tags; - protected long startTime; - protected long startTimeNano; // Only used to measure nano time durations - protected long durationNano; - protected final DDSpanContext context; - protected final LinkedHashSet traces; + protected final Tracer tracer; + protected String operationName; + protected Map tags; + protected long startTime; + protected long startTimeNano; // Only used to measure nano time durations + protected long durationNano; + protected final DDSpanContext context; + protected final LinkedHashSet traces; - DDSpan( - Tracer tracer, - String operationName, - LinkedHashSet traces, - Map tags, - Long timestamp, - DDSpanContext context) { + DDSpan( + Tracer tracer, + String operationName, + LinkedHashSet traces, + Map tags, + Long timestamp, + DDSpanContext context) { - this.tracer = tracer; - this.operationName = operationName; - this.traces = Optional.ofNullable(traces).orElse(new LinkedHashSet<>()); - this.tags = tags; - this.startTime = System.currentTimeMillis() * 1000000; - this.startTimeNano = System.nanoTime(); - this.context = context; + this.tracer = tracer; + this.operationName = operationName; + this.traces = Optional.ofNullable(traces).orElse(new LinkedHashSet<>()); + this.tags = tags; + this.startTime = System.currentTimeMillis() * 1000000; + this.startTimeNano = System.nanoTime(); + this.context = context; - // track each span of the trace - this.traces.add(this); + // track each span of the trace + this.traces.add(this); + } - } + public SpanContext context() { + return this.context; + } - public SpanContext context() { - return this.context; - } + public void finish() { + this.durationNano = System.nanoTime() - startTimeNano; + afterFinish(); + } - public void finish() { - finish(System.nanoTime()); - } + public void finish(long stopTimeMicro) { + this.durationNano = stopTimeMicro * 1000L - startTime; + afterFinish(); + } - public void finish(long stopTimeMicro) { - this.durationNano = stopTimeMicro * 1000L - startTime; - if (this.isRootSpan()) { - this.traces.stream() - .filter(s -> ((DDSpanContext) s.context()).getSpanId() != ((DDSpanContext) this.context()).getSpanId()) - .forEach(s -> s.finish()); - } - } + protected void afterFinish(){ + if (this.isRootSpan()) { + this.traces.stream() + .filter(s -> ((DDSpanContext) s.context()).getSpanId() != ((DDSpanContext) this.context()).getSpanId()) + .forEach(s -> s.finish()); + } + } - public void close() { - this.finish(); - } + public void close() { + this.finish(); + } - private boolean isRootSpan() { - return context.getTraceId() == context.getSpanId(); - } + private boolean isRootSpan() { + return context.getTraceId() == context.getSpanId(); + } - public io.opentracing.Span setTag(String tag, String value) { - return this.setTag(tag, value); - } + public io.opentracing.Span setTag(String tag, String value) { + return this.setTag(tag, value); + } - public Span setTag(String tag, boolean value) { - return this.setTag(tag, value); - } + public Span setTag(String tag, boolean value) { + return this.setTag(tag, value); + } - public Span setTag(String tag, Number value) { - return this.setTag(tag, (Object) value); - } + public Span setTag(String tag, Number value) { + return this.setTag(tag, (Object) value); + } - private Span setTag(String tag, Object value) { - this.tags.put(tag, value); - return this; - } + private Span setTag(String tag, Object value) { + this.tags.put(tag, value); + return this; + } - public Span log(Map map) { - return null; - } + public Span log(Map map) { + return null; + } - public Span log(long l, Map map) { - return null; - } + public Span log(long l, Map map) { + return null; + } - public Span log(String s) { - return null; - } + public Span log(String s) { + return null; + } - public Span log(long l, String s) { - return null; - } + public Span log(long l, String s) { + return null; + } - public Span setBaggageItem(String key, String value) { - this.context.setBaggageItem(key, value); - return this; - } + public Span setBaggageItem(String key, String value) { + this.context.setBaggageItem(key, value); + return this; + } - public String getBaggageItem(String key) { - return this.context.getBaggageItem(key); - } + public String getBaggageItem(String key) { + return this.context.getBaggageItem(key); + } - public Span setOperationName(String operationName) { - // FIXME: @renaud, the operationName (mandatory) is always set by the constructor - // FIXME: should be an UnsupportedOperation if we don't want to update the operationName + final - if (this.operationName != null) { - throw new IllegalArgumentException("The operationName is already assigned."); - } - this.operationName = operationName; - return this; - } + public Span setOperationName(String operationName) { + // FIXME: @renaud, the operationName (mandatory) is always set by the constructor + // FIXME: should be an UnsupportedOperation if we don't want to update the operationName + final + if (this.operationName != null) { + throw new IllegalArgumentException("The operationName is already assigned."); + } + this.operationName = operationName; + return this; + } - public Span log(String s, Object o) { - return null; - } + public Span log(String s, Object o) { + return null; + } - public Span log(long l, String s, Object o) { - return null; - } + public Span log(long l, String s, Object o) { + return null; + } - //Getters and JSON serialisation instructions + //Getters and JSON serialisation instructions - @JsonGetter(value = "name") - public String getOperationName() { - return operationName; - } + @JsonGetter(value = "name") + public String getOperationName() { + return operationName; + } - @JsonGetter(value = "meta") - public Map getTags() { - return this.tags; - } + @JsonGetter(value = "meta") + public Map getTags() { + return this.tags; + } - @JsonGetter(value = "start") - public long getStartTime() { - return startTime; - } + @JsonGetter(value = "start") + public long getStartTime() { + return startTime; + } - @JsonGetter(value = "duration") - public long getDurationNano() { - return durationNano; - } + @JsonGetter(value = "duration") + public long getDurationNano() { + return durationNano; + } - public String getService() { - return context.getServiceName(); - } + public String getService() { + return context.getServiceName(); + } - @JsonGetter(value = "trace_id") - public long getTraceId() { - return context.getTraceId(); - } + @JsonGetter(value = "trace_id") + public long getTraceId() { + return context.getTraceId(); + } - @JsonGetter(value = "span_id") - public long getSpanId() { - return context.getSpanId(); - } + @JsonGetter(value = "span_id") + public long getSpanId() { + return context.getSpanId(); + } - @JsonGetter(value = "parent_id") - public long getParentId() { - return context.getParentId(); - } + @JsonGetter(value = "parent_id") + public long getParentId() { + return context.getParentId(); + } - @JsonGetter(value = "resource") - public String getResourceName() { - return context.getResourceName() == null ? getOperationName() : context.getResourceName(); - } + @JsonGetter(value = "resource") + public String getResourceName() { + return context.getResourceName() == null ? getOperationName() : context.getResourceName(); + } - public String getType() { - return context.getSpanType(); - } + public String getType() { + return context.getSpanType(); + } - public int getError() { - return context.getErrorFlag() ? 1 : 0; - } + public int getError() { + return context.getErrorFlag() ? 1 : 0; + } - public LinkedHashSet getTraces() { - return traces; - } + @JsonIgnore + public LinkedHashSet getTraces() { + return traces; + } } diff --git a/src/main/java/com/datadoghq/trace/impl/Tracer.java b/src/main/java/com/datadoghq/trace/impl/Tracer.java index 07b4fa15bb..641fe5add6 100644 --- a/src/main/java/com/datadoghq/trace/impl/Tracer.java +++ b/src/main/java/com/datadoghq/trace/impl/Tracer.java @@ -130,12 +130,12 @@ public class Tracer implements io.opentracing.Tracer { p.getTraceId(), generatedId, p.getSpanId(), - Optional.ofNullable(p.getServiceName()).orElse(this.serviceName), + Optional.ofNullable(this.serviceName).orElse(p.getServiceName()), Optional.ofNullable(this.resourceName).orElse(this.operationName), p.getBaggageItems(), errorFlag, null, - this.spanType, + Optional.ofNullable(this.spanType).orElse(p.getSpanType()), true ); } else { diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java index 4c6cff9b59..6c0970d58b 100644 --- a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java @@ -53,6 +53,11 @@ public class DDAgentWriter implements Writer { public void close() { asyncWriterThread.interrupt(); + try { + asyncWriterThread.join(); + } catch (InterruptedException e) { + //Nothing to log as we expect and interrupted exception + } } protected class SpansSendingTask implements Runnable { @@ -64,6 +69,9 @@ public class DDAgentWriter implements Writer { try { //WAIT until a new span comes payload.add(traces.take()); + + //FIXME proper logging + System.out.println("Start writing traces"); //Drain all spans up to a certain batch suze traces.drainTo(payload, DEFAULT_BATCH_SIZE); @@ -77,6 +85,9 @@ public class DDAgentWriter implements Writer { spansCount+=trace.size(); } + //FIXME proper logging + System.out.println("Sent "+spansCount+" spans through "+payload.size()+" traces"); + //Force garbage collect of the payload payload.clear(); diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java b/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java index e65d1beb05..e498d310aa 100644 --- a/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java @@ -113,15 +113,13 @@ public class DDApi { Thread.sleep(1000); parent.finish(); - - List> traces = new ArrayList>(); - traces.add(array); - - DDApi api = new DDApi(DDAgentWriter.DEFAULT_HOSTNAME, DDAgentWriter.DEFAULT_PORT); -// String service = "{\"service_name\": {\"app\": \"service-name\",\"app_type\": \"web\"}}"; -// System.out.println("Pushed service: "+api.callPUT(api.servicesEndpoint, service)); - System.out.println("Pushed traces: "+api.sendTraces(traces)); + DDAgentWriter writer = new DDAgentWriter(); + writer.write(array); + + Thread.sleep(1000); + + writer.close(); } } diff --git a/src/test/java/Example.java b/src/test/java/Example.java index 8553b8c4ff..baf5d2f607 100644 --- a/src/test/java/Example.java +++ b/src/test/java/Example.java @@ -1,35 +1,42 @@ +import java.util.ArrayList; +import java.util.List; + import com.datadoghq.trace.Writer; import com.datadoghq.trace.impl.Tracer; import com.datadoghq.trace.writer.impl.DDAgentWriter; +import com.fasterxml.jackson.databind.ObjectMapper; + import io.opentracing.Span; public class Example { - public static void main(String[] args) { - - - Tracer tracer = new Tracer(); + public static void main(String[] args) throws Exception{ + List trace = new ArrayList(); + + Tracer tracer = new Tracer(); Writer writer = new DDAgentWriter(); Span parent = tracer .buildSpan("hello-world") .withServiceName("service-name") + .withSpanType("web") .start(); parent.setBaggageItem("a-baggage", "value"); + trace.add(parent); parent.finish(); - Tracer.SpanBuilder builder = (Tracer.SpanBuilder) tracer + Span child = tracer .buildSpan("hello-world") - .asChildOf(parent); - - Span child = builder - .withServiceName("service-name") - .start(); - - - - + .asChildOf(parent) + .withResourceName("resource-name") + .start(); + child.finish(); + trace.add(child); + parent.finish(); + + writer.write(trace); + writer.close(); } diff --git a/src/test/java/com/datadoghq/trace/impl/DDSpanTest.java b/src/test/java/com/datadoghq/trace/impl/DDSpanTest.java index 7dbdd244f9..ec17abbc80 100644 --- a/src/test/java/com/datadoghq/trace/impl/DDSpanTest.java +++ b/src/test/java/com/datadoghq/trace/impl/DDSpanTest.java @@ -1,13 +1,8 @@ package com.datadoghq.trace.impl; -import io.opentracing.Span; -import org.junit.Test; - -import java.util.Optional; - import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.floatThat; -import static org.mockito.Mockito.mock; + +import org.junit.Test; public class DDSpanTest {