Merge remote-tracking branch 'origin/dev' into dev

# Conflicts:
#	src/main/java/com/datadoghq/trace/impl/DDSpan.java
#	src/main/java/com/datadoghq/trace/writer/impl/DDApi.java
#	src/test/java/com/datadoghq/trace/impl/DDSpanTest.java
This commit is contained in:
Guillaume Polaert 2017-04-27 09:45:37 +02:00
commit d364d10008
6 changed files with 234 additions and 186 deletions

View File

@ -1,10 +1,17 @@
package com.datadoghq.trace; package com.datadoghq.trace;
import java.util.List;
import io.opentracing.Span; import io.opentracing.Span;
public interface Writer { public interface Writer {
public void write(Span span); /**
* Write a trace represented by the entire list of all the finished spans
*
* @param trace
*/
public void write(List<Span> trace);
public void close(); public void close();
} }

View File

@ -1,190 +1,193 @@
package com.datadoghq.trace.impl; package com.datadoghq.trace.impl;
import java.time.Clock; import java.util.LinkedHashSet;
import java.util.*; import java.util.Map;
import java.util.Optional;
import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.SpanContext; import io.opentracing.SpanContext;
public class DDSpan implements io.opentracing.Span { public class DDSpan implements io.opentracing.Span {
protected final Tracer tracer; protected final Tracer tracer;
protected String operationName; protected String operationName;
protected Map<String, Object> tags; protected Map<String, Object> tags;
protected long startTimeNano; protected long startTime;
protected long durationNano; protected long startTimeNano; // Only used to measure nano time durations
protected final DDSpanContext context; protected long durationNano;
protected final ArrayList<Span> trace; protected final DDSpanContext context;
protected final LinkedHashSet<Span> traces;
DDSpan( DDSpan(
Tracer tracer, Tracer tracer,
String operationName, String operationName,
ArrayList<Span> trace, LinkedHashSet<Span> traces,
Map<String, Object> tags, Map<String, Object> tags,
Long timestampMilliseconds, Long timestamp,
DDSpanContext context) { DDSpanContext context) {
this.tracer = tracer; this.tracer = tracer;
this.operationName = operationName; this.operationName = operationName;
this.trace = Optional.ofNullable(trace).orElse(new ArrayList<>()); this.traces = Optional.ofNullable(traces).orElse(new LinkedHashSet<>());
this.tags = tags; this.tags = tags;
this.startTimeNano = Optional.ofNullable(timestampMilliseconds).orElse(Clock.systemUTC().millis()) * 1000000L; this.startTime = System.currentTimeMillis() * 1000000;
this.context = context; this.startTimeNano = System.nanoTime();
this.context = context;
// track each span of the trace // track each span of the trace
this.trace.add(this); this.traces.add(this);
}
} public SpanContext context() {
return this.context;
}
public SpanContext context() { public void finish() {
return this.context; this.durationNano = System.nanoTime() - startTimeNano;
} afterFinish();
}
public void finish() { public void finish(long stopTimeMicro) {
finish(Clock.systemUTC().millis()); this.durationNano = stopTimeMicro * 1000L - startTime;
} afterFinish();
}
public void finish(long stopTimeMillis) { protected void afterFinish(){
this.durationNano = (stopTimeMillis * 1000000L - startTimeNano); if (this.isRootSpan()) {
if (this.isRootSpan()) { this.traces.stream()
this.trace.stream() .filter(s -> ((DDSpanContext) s.context()).getSpanId() != ((DDSpanContext) this.context()).getSpanId())
.filter(s -> { .forEach(s -> s.finish());
boolean isSelf = ((DDSpanContext) s.context()).getSpanId() == ((DDSpanContext) this.context()).getSpanId(); }
boolean isFinished = ((DDSpan) s).getDurationNano() != 0L; }
return !isSelf && !isFinished;
})
.forEach(Span::finish);
}
}
public void close() { public void close() {
this.finish(); this.finish();
} }
private boolean isRootSpan() { private boolean isRootSpan() {
return context.getTraceId() == context.getSpanId(); return context.getTraceId() == context.getSpanId();
} }
public io.opentracing.Span setTag(String tag, String value) { public io.opentracing.Span setTag(String tag, String value) {
return this.setTag(tag, value); return this.setTag(tag, value);
} }
public Span setTag(String tag, boolean value) { public Span setTag(String tag, boolean value) {
return this.setTag(tag, value); return this.setTag(tag, value);
} }
public Span setTag(String tag, Number value) { public Span setTag(String tag, Number value) {
return this.setTag(tag, (Object) value); return this.setTag(tag, (Object) value);
} }
private Span setTag(String tag, Object value) { private Span setTag(String tag, Object value) {
this.tags.put(tag, value); this.tags.put(tag, value);
return this; return this;
} }
public Span log(Map<String, ?> map) { public Span log(Map<String, ?> map) {
return null; return null;
} }
public Span log(long l, Map<String, ?> map) { public Span log(long l, Map<String, ?> map) {
return null; return null;
} }
public Span log(String s) { public Span log(String s) {
return null; return null;
} }
public Span log(long l, String s) { public Span log(long l, String s) {
return null; return null;
} }
public Span setBaggageItem(String key, String value) { public Span setBaggageItem(String key, String value) {
this.context.setBaggageItem(key, value); this.context.setBaggageItem(key, value);
return this; return this;
} }
public String getBaggageItem(String key) { public String getBaggageItem(String key) {
return this.context.getBaggageItem(key); return this.context.getBaggageItem(key);
} }
public Span setOperationName(String operationName) { public Span setOperationName(String operationName) {
// FIXME: @renaud, the operationName (mandatory) is always set by the constructor // FIXME: @renaud, the operationName (mandatory) is always set by the constructor
// FIXME: should be an UnsupportedOperation if we don't want to update the operationName + final // FIXME: should be an UnsupportedOperation if we don't want to update the operationName + final
if (this.operationName != null) { if (this.operationName != null) {
throw new IllegalArgumentException("The operationName is already assigned."); throw new IllegalArgumentException("The operationName is already assigned.");
} }
this.operationName = operationName; this.operationName = operationName;
return this; return this;
} }
public Span log(String s, Object o) { public Span log(String s, Object o) {
return null; return null;
} }
public Span log(long l, String s, Object o) { public Span log(long l, String s, Object o) {
return null; return null;
} }
//Getters and JSON serialisation instructions //Getters and JSON serialisation instructions
@JsonGetter(value = "name") @JsonGetter(value = "name")
public String getOperationName() { public String getOperationName() {
return operationName; return operationName;
} }
@JsonGetter(value = "meta") @JsonGetter(value = "meta")
public Map<String, Object> getTags() { public Map<String, Object> getTags() {
return this.tags; return this.tags;
} }
@JsonGetter(value = "start") @JsonGetter(value = "start")
public long getStartTime() { public long getStartTime() {
return startTimeNano; return startTime;
} }
@JsonGetter(value = "duration") @JsonGetter(value = "duration")
public long getDurationNano() { public long getDurationNano() {
return durationNano; return durationNano;
} }
public String getService() { public String getService() {
return context.getServiceName(); return context.getServiceName();
} }
@JsonGetter(value = "trace_id") @JsonGetter(value = "trace_id")
public long getTraceId() { public long getTraceId() {
return context.getTraceId(); return context.getTraceId();
} }
@JsonGetter(value = "span_id") @JsonGetter(value = "span_id")
public long getSpanId() { public long getSpanId() {
return context.getSpanId(); return context.getSpanId();
} }
@JsonGetter(value = "parent_id") @JsonGetter(value = "parent_id")
public long getParentId() { public long getParentId() {
return context.getParentId(); return context.getParentId();
} }
@JsonGetter(value = "resource") @JsonGetter(value = "resource")
public String getResourceName() { public String getResourceName() {
return context.getResourceName() == null ? getOperationName() : context.getResourceName(); return context.getResourceName() == null ? getOperationName() : context.getResourceName();
} }
public String getType() { public String getType() {
return context.getSpanType(); return context.getSpanType();
} }
public int getError() { public int getError() {
return context.getErrorFlag() ? 1 : 0; return context.getErrorFlag() ? 1 : 0;
} }
@JsonIgnore @JsonIgnore
public ArrayList<Span> getTrace() { public LinkedHashSet<Span> getTraces() {
return trace; return traces;
} }
} }

View File

@ -130,12 +130,12 @@ public class Tracer implements io.opentracing.Tracer {
p.getTraceId(), p.getTraceId(),
generatedId, generatedId,
p.getSpanId(), p.getSpanId(),
Optional.ofNullable(p.getServiceName()).orElse(this.serviceName), Optional.ofNullable(this.serviceName).orElse(p.getServiceName()),
Optional.ofNullable(this.resourceName).orElse(this.operationName), Optional.ofNullable(this.resourceName).orElse(this.operationName),
p.getBaggageItems(), p.getBaggageItems(),
errorFlag, errorFlag,
null, null,
this.spanType, Optional.ofNullable(this.spanType).orElse(p.getSpanType()),
true true
); );
} else { } else {

View File

@ -4,6 +4,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import com.datadoghq.trace.Writer; import com.datadoghq.trace.Writer;
@ -14,52 +15,84 @@ public class DDAgentWriter implements Writer {
protected static final String DEFAULT_HOSTNAME = "localhost"; protected static final String DEFAULT_HOSTNAME = "localhost";
protected static final int DEFAULT_PORT = 8126; protected static final int DEFAULT_PORT = 8126;
protected static final int DEFAULT_MAX_TRACES = 1000; protected static final int DEFAULT_MAX_SPANS = 1000;
protected static final int DEFAULT_BATCH_SIZE = 10; protected static final int DEFAULT_BATCH_SIZE = 10;
protected static final int DEFAULT_MAX_SERVICES = 1000; protected static final int DEFAULT_MAX_SERVICES = 1000;
protected static final long DEFAULT_TIMEOUT = 5000; protected static final long DEFAULT_TIMEOUT = 5000;
protected final BlockingQueue<Span> commandQueue; private final Semaphore tokens;
protected final BlockingQueue<List<Span>> traces;
protected final Thread asyncWriterThread; protected final Thread asyncWriterThread;
protected final DDApi api;
public DDAgentWriter() { public DDAgentWriter() {
super(); super();
commandQueue = new ArrayBlockingQueue<Span>(DEFAULT_MAX_TRACES); tokens = new Semaphore(DEFAULT_MAX_SPANS);
traces = new ArrayBlockingQueue<List<Span>>(DEFAULT_MAX_SPANS);
api = new DDApi(DEFAULT_HOSTNAME, DEFAULT_PORT);
asyncWriterThread = new Thread(new SpansSendingTask(), "dd.DDAgentWriter-SpansSendingTask"); asyncWriterThread = new Thread(new SpansSendingTask(), "dd.DDAgentWriter-SpansSendingTask");
asyncWriterThread.setDaemon(true); asyncWriterThread.setDaemon(true);
asyncWriterThread.start(); asyncWriterThread.start();
} }
public void write(Span span) { public void write(List<Span> trace) {
try{ //Try to add a new span in the queue
//Try to add a new span in the queue boolean proceed = tokens.tryAcquire(trace.size());
commandQueue.add(span);
}catch(IllegalStateException e){ if(proceed){
traces.add(trace);
}else{
//It was not possible to add the span the queue is full! //It was not possible to add the span the queue is full!
//FIXME proper logging //FIXME proper logging
System.out.println("Cannot add the following span as the async queue is full: "+span); System.out.println("Cannot add the following trace as the async queue is full: "+trace);
} }
} }
public void close() { public void close() {
asyncWriterThread.interrupt(); asyncWriterThread.interrupt();
try {
asyncWriterThread.join();
} catch (InterruptedException e) {
//Nothing to log as we expect and interrupted exception
}
} }
protected class SpansSendingTask implements Runnable { protected class SpansSendingTask implements Runnable {
protected final List<List<Span>> payload = new ArrayList<List<Span>>();
public void run() { public void run() {
while (true) { while (true) {
try { try {
//Wait until a new span comes //WAIT until a new span comes
Span span = commandQueue.take(); payload.add(traces.take());
//FIXME proper logging
System.out.println("Start writing traces");
//Drain all spans up to a certain batch suze //Drain all spans up to a certain batch suze
List<Span> spans = new ArrayList<Span>(); traces.drainTo(payload, DEFAULT_BATCH_SIZE);
spans.add(span);
commandQueue.drainTo(spans, DEFAULT_BATCH_SIZE);
//Then write to the agent //SEND the payload to the agent
System.out.println(spans); api.sendTraces(payload);
//Compute the number of spans sent
int spansCount = 0;
for(List<Span> trace:payload){
spansCount+=trace.size();
}
//FIXME proper logging
System.out.println("Sent "+spansCount+" spans through "+payload.size()+" traces");
//Force garbage collect of the payload
payload.clear();
//Release the tokens
tokens.release(spansCount);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
// FIXME proper logging // FIXME proper logging

View File

@ -16,7 +16,7 @@ import io.opentracing.Span;
public class DDApi { public class DDApi {
protected static final String TRACES_ENDPOINT = "/v0.3/trace"; protected static final String TRACES_ENDPOINT = "/v0.3/traces";
protected static final String SERVICES_ENDPOINT = "/v0.3/services"; protected static final String SERVICES_ENDPOINT = "/v0.3/services";
protected final String host; protected final String host;
@ -114,14 +114,12 @@ public class DDApi {
parent.finish(); parent.finish();
List<List<Span>> traces = new ArrayList<List<Span>>(); DDAgentWriter writer = new DDAgentWriter();
traces.add(array); writer.write(array);
DDApi api = new DDApi(DDAgentWriter.DEFAULT_HOSTNAME, DDAgentWriter.DEFAULT_PORT); Thread.sleep(1000);
// String service = "{\"service_name\": {\"app\": \"service-name\",\"app_type\": \"web\"}}"; writer.close();
// System.out.println("Pushed service: "+api.callPUT(api.servicesEndpoint, service));
System.out.println("Pushed trace: "+api.sendTraces(traces));
} }
} }

View File

@ -1,34 +1,41 @@
import java.util.ArrayList;
import java.util.List;
import com.datadoghq.trace.Writer; import com.datadoghq.trace.Writer;
import com.datadoghq.trace.impl.Tracer; import com.datadoghq.trace.impl.Tracer;
import com.datadoghq.trace.writer.impl.DDAgentWriter; import com.datadoghq.trace.writer.impl.DDAgentWriter;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentracing.Span; import io.opentracing.Span;
public class Example { public class Example {
public static void main(String[] args) { public static void main(String[] args) throws Exception{
List<Span> trace = new ArrayList<Span>();
Tracer tracer = new Tracer();
Tracer tracer = new Tracer();
Writer writer = new DDAgentWriter(); Writer writer = new DDAgentWriter();
Span parent = tracer Span parent = tracer
.buildSpan("hello-world") .buildSpan("hello-world")
.withServiceName("service-name") .withServiceName("service-name")
.withSpanType("web")
.start(); .start();
parent.setBaggageItem("a-baggage", "value"); parent.setBaggageItem("a-baggage", "value");
trace.add(parent);
parent.finish(); parent.finish();
Tracer.SpanBuilder builder = (Tracer.SpanBuilder) tracer Span child = tracer
.buildSpan("hello-world") .buildSpan("hello-world")
.asChildOf(parent); .asChildOf(parent)
.withResourceName("resource-name")
Span child = builder .start();
.withServiceName("service-name") child.finish();
.start(); trace.add(child);
parent.finish();
writer.write(trace);
writer.close(); writer.close();