diff --git a/src/main/java/com/datadoghq/trace/Writer.java b/src/main/java/com/datadoghq/trace/Writer.java index ebf0360422..afde6d4264 100644 --- a/src/main/java/com/datadoghq/trace/Writer.java +++ b/src/main/java/com/datadoghq/trace/Writer.java @@ -1,10 +1,17 @@ package com.datadoghq.trace; +import java.util.List; + import io.opentracing.Span; public interface Writer { - public void write(Span span); + /** + * Write a trace represented by the entire list of all the finished spans + * + * @param trace + */ + public void write(List trace); public void close(); } diff --git a/src/main/java/com/datadoghq/trace/impl/DDSpan.java b/src/main/java/com/datadoghq/trace/impl/DDSpan.java index f750e08311..c314c16ec3 100644 --- a/src/main/java/com/datadoghq/trace/impl/DDSpan.java +++ b/src/main/java/com/datadoghq/trace/impl/DDSpan.java @@ -1,190 +1,193 @@ package com.datadoghq.trace.impl; -import java.time.Clock; -import java.util.*; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Optional; import com.fasterxml.jackson.annotation.JsonGetter; - import com.fasterxml.jackson.annotation.JsonIgnore; + import io.opentracing.Span; import io.opentracing.SpanContext; public class DDSpan implements io.opentracing.Span { - protected final Tracer tracer; - protected String operationName; - protected Map tags; - protected long startTimeNano; - protected long durationNano; - protected final DDSpanContext context; - protected final ArrayList trace; + protected final Tracer tracer; + protected String operationName; + protected Map tags; + protected long startTime; + protected long startTimeNano; // Only used to measure nano time durations + protected long durationNano; + protected final DDSpanContext context; + protected final LinkedHashSet traces; - DDSpan( - Tracer tracer, - String operationName, - ArrayList trace, - Map tags, - Long timestampMilliseconds, - DDSpanContext context) { + DDSpan( + Tracer tracer, + String operationName, + LinkedHashSet traces, + Map tags, + Long timestamp, + DDSpanContext context) { - this.tracer = tracer; - this.operationName = operationName; - this.trace = Optional.ofNullable(trace).orElse(new ArrayList<>()); - this.tags = tags; - this.startTimeNano = Optional.ofNullable(timestampMilliseconds).orElse(Clock.systemUTC().millis()) * 1000000L; - this.context = context; + this.tracer = tracer; + this.operationName = operationName; + this.traces = Optional.ofNullable(traces).orElse(new LinkedHashSet<>()); + this.tags = tags; + this.startTime = System.currentTimeMillis() * 1000000; + this.startTimeNano = System.nanoTime(); + this.context = context; - // track each span of the trace - this.trace.add(this); + // track each span of the trace + this.traces.add(this); + } - } + public SpanContext context() { + return this.context; + } - public SpanContext context() { - return this.context; - } + public void finish() { + this.durationNano = System.nanoTime() - startTimeNano; + afterFinish(); + } - public void finish() { - finish(Clock.systemUTC().millis()); - } + public void finish(long stopTimeMicro) { + this.durationNano = stopTimeMicro * 1000L - startTime; + afterFinish(); + } - public void finish(long stopTimeMillis) { - this.durationNano = (stopTimeMillis * 1000000L - startTimeNano); - if (this.isRootSpan()) { - this.trace.stream() - .filter(s -> { - boolean isSelf = ((DDSpanContext) s.context()).getSpanId() == ((DDSpanContext) this.context()).getSpanId(); - boolean isFinished = ((DDSpan) s).getDurationNano() != 0L; - return !isSelf && !isFinished; - }) - .forEach(Span::finish); - } - } + protected void afterFinish(){ + if (this.isRootSpan()) { + this.traces.stream() + .filter(s -> ((DDSpanContext) s.context()).getSpanId() != ((DDSpanContext) this.context()).getSpanId()) + .forEach(s -> s.finish()); + } + } - public void close() { - this.finish(); - } + public void close() { + this.finish(); + } - private boolean isRootSpan() { - return context.getTraceId() == context.getSpanId(); - } + private boolean isRootSpan() { + return context.getTraceId() == context.getSpanId(); + } - public io.opentracing.Span setTag(String tag, String value) { - return this.setTag(tag, value); - } + public io.opentracing.Span setTag(String tag, String value) { + return this.setTag(tag, value); + } - public Span setTag(String tag, boolean value) { - return this.setTag(tag, value); - } + public Span setTag(String tag, boolean value) { + return this.setTag(tag, value); + } - public Span setTag(String tag, Number value) { - return this.setTag(tag, (Object) value); - } + public Span setTag(String tag, Number value) { + return this.setTag(tag, (Object) value); + } - private Span setTag(String tag, Object value) { - this.tags.put(tag, value); - return this; - } + private Span setTag(String tag, Object value) { + this.tags.put(tag, value); + return this; + } - public Span log(Map map) { - return null; - } + public Span log(Map map) { + return null; + } - public Span log(long l, Map map) { - return null; - } + public Span log(long l, Map map) { + return null; + } - public Span log(String s) { - return null; - } + public Span log(String s) { + return null; + } - public Span log(long l, String s) { - return null; - } + public Span log(long l, String s) { + return null; + } - public Span setBaggageItem(String key, String value) { - this.context.setBaggageItem(key, value); - return this; - } + public Span setBaggageItem(String key, String value) { + this.context.setBaggageItem(key, value); + return this; + } - public String getBaggageItem(String key) { - return this.context.getBaggageItem(key); - } + public String getBaggageItem(String key) { + return this.context.getBaggageItem(key); + } - public Span setOperationName(String operationName) { - // FIXME: @renaud, the operationName (mandatory) is always set by the constructor - // FIXME: should be an UnsupportedOperation if we don't want to update the operationName + final - if (this.operationName != null) { - throw new IllegalArgumentException("The operationName is already assigned."); - } - this.operationName = operationName; - return this; - } + public Span setOperationName(String operationName) { + // FIXME: @renaud, the operationName (mandatory) is always set by the constructor + // FIXME: should be an UnsupportedOperation if we don't want to update the operationName + final + if (this.operationName != null) { + throw new IllegalArgumentException("The operationName is already assigned."); + } + this.operationName = operationName; + return this; + } - public Span log(String s, Object o) { - return null; - } + public Span log(String s, Object o) { + return null; + } - public Span log(long l, String s, Object o) { - return null; - } + public Span log(long l, String s, Object o) { + return null; + } - //Getters and JSON serialisation instructions + //Getters and JSON serialisation instructions - @JsonGetter(value = "name") - public String getOperationName() { - return operationName; - } + @JsonGetter(value = "name") + public String getOperationName() { + return operationName; + } - @JsonGetter(value = "meta") - public Map getTags() { - return this.tags; - } + @JsonGetter(value = "meta") + public Map getTags() { + return this.tags; + } - @JsonGetter(value = "start") - public long getStartTime() { - return startTimeNano; - } + @JsonGetter(value = "start") + public long getStartTime() { + return startTime; + } - @JsonGetter(value = "duration") - public long getDurationNano() { - return durationNano; - } + @JsonGetter(value = "duration") + public long getDurationNano() { + return durationNano; + } - public String getService() { - return context.getServiceName(); - } + public String getService() { + return context.getServiceName(); + } - @JsonGetter(value = "trace_id") - public long getTraceId() { - return context.getTraceId(); - } + @JsonGetter(value = "trace_id") + public long getTraceId() { + return context.getTraceId(); + } - @JsonGetter(value = "span_id") - public long getSpanId() { - return context.getSpanId(); - } + @JsonGetter(value = "span_id") + public long getSpanId() { + return context.getSpanId(); + } - @JsonGetter(value = "parent_id") - public long getParentId() { - return context.getParentId(); - } + @JsonGetter(value = "parent_id") + public long getParentId() { + return context.getParentId(); + } - @JsonGetter(value = "resource") - public String getResourceName() { - return context.getResourceName() == null ? getOperationName() : context.getResourceName(); - } + @JsonGetter(value = "resource") + public String getResourceName() { + return context.getResourceName() == null ? getOperationName() : context.getResourceName(); + } - public String getType() { - return context.getSpanType(); - } + public String getType() { + return context.getSpanType(); + } - public int getError() { - return context.getErrorFlag() ? 1 : 0; - } + public int getError() { + return context.getErrorFlag() ? 1 : 0; + } - @JsonIgnore - public ArrayList getTrace() { - return trace; - } + @JsonIgnore + public LinkedHashSet getTraces() { + return traces; + } } diff --git a/src/main/java/com/datadoghq/trace/impl/Tracer.java b/src/main/java/com/datadoghq/trace/impl/Tracer.java index 615f9e5fa7..81d598fc96 100644 --- a/src/main/java/com/datadoghq/trace/impl/Tracer.java +++ b/src/main/java/com/datadoghq/trace/impl/Tracer.java @@ -130,12 +130,12 @@ public class Tracer implements io.opentracing.Tracer { p.getTraceId(), generatedId, p.getSpanId(), - Optional.ofNullable(p.getServiceName()).orElse(this.serviceName), + Optional.ofNullable(this.serviceName).orElse(p.getServiceName()), Optional.ofNullable(this.resourceName).orElse(this.operationName), p.getBaggageItems(), errorFlag, null, - this.spanType, + Optional.ofNullable(this.spanType).orElse(p.getSpanType()), true ); } else { diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java index 71e1fa453a..6c0970d58b 100644 --- a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java @@ -4,6 +4,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; import com.datadoghq.trace.Writer; @@ -13,58 +14,90 @@ public class DDAgentWriter implements Writer { protected static final String DEFAULT_HOSTNAME = "localhost"; protected static final int DEFAULT_PORT = 8126; - - protected static final int DEFAULT_MAX_TRACES = 1000; + + protected static final int DEFAULT_MAX_SPANS = 1000; protected static final int DEFAULT_BATCH_SIZE = 10; protected static final int DEFAULT_MAX_SERVICES = 1000; protected static final long DEFAULT_TIMEOUT = 5000; - protected final BlockingQueue commandQueue; + private final Semaphore tokens; + protected final BlockingQueue> traces; protected final Thread asyncWriterThread; + protected final DDApi api; + public DDAgentWriter() { super(); - commandQueue = new ArrayBlockingQueue(DEFAULT_MAX_TRACES); - + tokens = new Semaphore(DEFAULT_MAX_SPANS); + traces = new ArrayBlockingQueue>(DEFAULT_MAX_SPANS); + + api = new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT); + asyncWriterThread = new Thread(new SpansSendingTask(), "dd.DDAgentWriter-SpansSendingTask"); asyncWriterThread.setDaemon(true); asyncWriterThread.start(); } - public void write(Span span) { - try{ - //Try to add a new span in the queue - commandQueue.add(span); - }catch(IllegalStateException e){ + public void write(List trace) { + //Try to add a new span in the queue + boolean proceed = tokens.tryAcquire(trace.size()); + + if(proceed){ + traces.add(trace); + }else{ //It was not possible to add the span the queue is full! //FIXME proper logging - System.out.println("Cannot add the following span as the async queue is full: "+span); + System.out.println("Cannot add the following trace as the async queue is full: "+trace); } } public void close() { asyncWriterThread.interrupt(); + try { + asyncWriterThread.join(); + } catch (InterruptedException e) { + //Nothing to log as we expect and interrupted exception + } } protected class SpansSendingTask implements Runnable { + + protected final List> payload = new ArrayList>(); + public void run() { while (true) { try { - //Wait until a new span comes - Span span = commandQueue.take(); + //WAIT until a new span comes + payload.add(traces.take()); + //FIXME proper logging + System.out.println("Start writing traces"); + //Drain all spans up to a certain batch suze - List spans = new ArrayList(); - spans.add(span); - commandQueue.drainTo(spans, DEFAULT_BATCH_SIZE); + traces.drainTo(payload, DEFAULT_BATCH_SIZE); + + //SEND the payload to the agent + api.sendTraces(payload); + + //Compute the number of spans sent + int spansCount = 0; + for(List trace:payload){ + spansCount+=trace.size(); + } - //Then write to the agent - System.out.println(spans); + //FIXME proper logging + System.out.println("Sent "+spansCount+" spans through "+payload.size()+" traces"); + + //Force garbage collect of the payload + payload.clear(); + + //Release the tokens + tokens.release(spansCount); } catch (InterruptedException e) { // TODO Auto-generated catch block // FIXME proper logging e.printStackTrace(); - + //The thread was interrupted, we break the LOOP break; } diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java b/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java index d03f78f22e..55c6dab4fe 100644 --- a/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java @@ -16,7 +16,7 @@ import io.opentracing.Span; public class DDApi { - protected static final String TRACES_ENDPOINT = "/v0.3/trace"; + protected static final String TRACES_ENDPOINT = "/v0.3/traces"; protected static final String SERVICES_ENDPOINT = "/v0.3/services"; protected final String host; @@ -114,14 +114,12 @@ public class DDApi { parent.finish(); - List> traces = new ArrayList>(); - traces.add(array); + DDAgentWriter writer = new DDAgentWriter(); + writer.write(array); - DDApi api = new DDApi(DDAgentWriter.DEFAULT_HOSTNAME, DDAgentWriter.DEFAULT_PORT); + Thread.sleep(1000); -// String service = "{\"service_name\": {\"app\": \"service-name\",\"app_type\": \"web\"}}"; -// System.out.println("Pushed service: "+api.callPUT(api.servicesEndpoint, service)); - System.out.println("Pushed trace: "+api.sendTraces(traces)); + writer.close(); } } diff --git a/src/test/java/Example.java b/src/test/java/Example.java index 8553b8c4ff..baf5d2f607 100644 --- a/src/test/java/Example.java +++ b/src/test/java/Example.java @@ -1,35 +1,42 @@ +import java.util.ArrayList; +import java.util.List; + import com.datadoghq.trace.Writer; import com.datadoghq.trace.impl.Tracer; import com.datadoghq.trace.writer.impl.DDAgentWriter; +import com.fasterxml.jackson.databind.ObjectMapper; + import io.opentracing.Span; public class Example { - public static void main(String[] args) { - - - Tracer tracer = new Tracer(); + public static void main(String[] args) throws Exception{ + List trace = new ArrayList(); + + Tracer tracer = new Tracer(); Writer writer = new DDAgentWriter(); Span parent = tracer .buildSpan("hello-world") .withServiceName("service-name") + .withSpanType("web") .start(); parent.setBaggageItem("a-baggage", "value"); + trace.add(parent); parent.finish(); - Tracer.SpanBuilder builder = (Tracer.SpanBuilder) tracer + Span child = tracer .buildSpan("hello-world") - .asChildOf(parent); - - Span child = builder - .withServiceName("service-name") - .start(); - - - - + .asChildOf(parent) + .withResourceName("resource-name") + .start(); + child.finish(); + trace.add(child); + parent.finish(); + + writer.write(trace); + writer.close(); }