From 1a981bb9f30bf1c6bdbdac5e1e8d6342c6ca5d9f Mon Sep 17 00:00:00 2001 From: renaudboutet Date: Wed, 26 Apr 2017 10:33:26 +0200 Subject: [PATCH] Added writer, serialisation, API + changed some types --- .classpath | 26 ++++++ .project | 23 +++++ pom.xml | 93 ++++++++++--------- .../trace/{ISampler.java => Sampler.java} | 2 +- .../com/datadoghq/trace/SpanSerializer.java | 11 +++ .../trace/{IWriter.java => Writer.java} | 4 +- .../trace/impl/DDSpanSerializer.java | 36 +++++++ .../java/com/datadoghq/trace/impl/Span.java | 8 +- .../java/com/datadoghq/trace/impl/Tracer.java | 14 +-- .../trace/writer/impl/DDAgentWriter.java | 73 +++++++++++++++ .../datadoghq/trace/writer/impl/DDApi.java | 59 ++++++++++++ 11 files changed, 293 insertions(+), 56 deletions(-) create mode 100644 .classpath create mode 100644 .project rename src/main/java/com/datadoghq/trace/{ISampler.java => Sampler.java} (80%) create mode 100644 src/main/java/com/datadoghq/trace/SpanSerializer.java rename src/main/java/com/datadoghq/trace/{IWriter.java => Writer.java} (59%) create mode 100644 src/main/java/com/datadoghq/trace/impl/DDSpanSerializer.java create mode 100644 src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java create mode 100644 src/main/java/com/datadoghq/trace/writer/impl/DDApi.java diff --git a/.classpath b/.classpath new file mode 100644 index 0000000000..af1430be15 --- /dev/null +++ b/.classpath @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/.project b/.project new file mode 100644 index 0000000000..0a968338d1 --- /dev/null +++ b/.project @@ -0,0 +1,23 @@ + + + raclette-java + + + + + + org.eclipse.jdt.core.javabuilder + + + + + org.eclipse.m2e.core.maven2Builder + + + + + + org.eclipse.jdt.core.javanature + org.eclipse.m2e.core.maven2Nature + + diff --git a/pom.xml b/pom.xml index d8b90cf376..54bed60f72 100644 --- a/pom.xml +++ b/pom.xml @@ -1,51 +1,56 @@ - - 4.0.0 + + 4.0.0 - com.datadog - raclette-java - 1.0-SNAPSHOT + com.datadog + raclette-java + 1.0-SNAPSHOT - - - io.opentracing - opentracing-api - 0.21.0 - + + + io.opentracing + opentracing-api + 0.21.0 + + + + com.fasterxml.jackson.core + jackson-databind + 2.8.8 + - - - junit - junit - 4.12 - test - - - org.assertj - assertj-core - 3.6.2 - test - - - org.mockito - mockito-core - 2.7.22 - - + + + junit + junit + 4.12 + test + + + org.assertj + assertj-core + 3.6.2 + test + + + org.mockito + mockito-core + 2.7.22 + + - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - - - - + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + \ No newline at end of file diff --git a/src/main/java/com/datadoghq/trace/ISampler.java b/src/main/java/com/datadoghq/trace/Sampler.java similarity index 80% rename from src/main/java/com/datadoghq/trace/ISampler.java rename to src/main/java/com/datadoghq/trace/Sampler.java index 75e0187f32..371ee3b3eb 100644 --- a/src/main/java/com/datadoghq/trace/ISampler.java +++ b/src/main/java/com/datadoghq/trace/Sampler.java @@ -3,7 +3,7 @@ package com.datadoghq.trace; import com.datadoghq.trace.impl.Span; -public interface ISampler { +public interface Sampler { public boolean sample(Span span); diff --git a/src/main/java/com/datadoghq/trace/SpanSerializer.java b/src/main/java/com/datadoghq/trace/SpanSerializer.java new file mode 100644 index 0000000000..9ec3a86711 --- /dev/null +++ b/src/main/java/com/datadoghq/trace/SpanSerializer.java @@ -0,0 +1,11 @@ +package com.datadoghq.trace; + +import io.opentracing.Span; + +public interface SpanSerializer { + + public String serialize(Span t) throws Exception; + + public Span deserialize(String str) throws Exception; + +} diff --git a/src/main/java/com/datadoghq/trace/IWriter.java b/src/main/java/com/datadoghq/trace/Writer.java similarity index 59% rename from src/main/java/com/datadoghq/trace/IWriter.java rename to src/main/java/com/datadoghq/trace/Writer.java index c542e78d12..ebf0360422 100644 --- a/src/main/java/com/datadoghq/trace/IWriter.java +++ b/src/main/java/com/datadoghq/trace/Writer.java @@ -1,8 +1,8 @@ package com.datadoghq.trace; -import com.datadoghq.trace.impl.Span; +import io.opentracing.Span; -public interface IWriter { +public interface Writer { public void write(Span span); diff --git a/src/main/java/com/datadoghq/trace/impl/DDSpanSerializer.java b/src/main/java/com/datadoghq/trace/impl/DDSpanSerializer.java new file mode 100644 index 0000000000..ceefa65ca2 --- /dev/null +++ b/src/main/java/com/datadoghq/trace/impl/DDSpanSerializer.java @@ -0,0 +1,36 @@ +package com.datadoghq.trace.impl; + +import java.io.IOException; + +import com.datadoghq.trace.SpanSerializer; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class DDSpanSerializer implements SpanSerializer { + + protected final ObjectMapper objectMapper = new ObjectMapper(); + + public String serialize(io.opentracing.Span t) throws JsonProcessingException { + return objectMapper.writeValueAsString(t); + } + + public io.opentracing.Span deserialize(String str) throws JsonParseException, JsonMappingException, IOException { + return objectMapper.readValue(str, Span.class); + } + + public static void main(String[] args) throws Exception{ + + Tracer tracer = new Tracer(); + io.opentracing.Span span = tracer.buildSpan("Hello!") + .withTag("port", 1234) + .withTag("bool", true) + .withTag("hello", "world") + .start(); + DDSpanSerializer ddSpanSerializer = new DDSpanSerializer(); + + System.out.println(ddSpanSerializer.serialize(span)); + + } +} diff --git a/src/main/java/com/datadoghq/trace/impl/Span.java b/src/main/java/com/datadoghq/trace/impl/Span.java index 5dc808d2b5..396f5136e1 100644 --- a/src/main/java/com/datadoghq/trace/impl/Span.java +++ b/src/main/java/com/datadoghq/trace/impl/Span.java @@ -13,14 +13,14 @@ public class Span implements io.opentracing.Span { private Map tags; private long startTime; private long durationMilliseconds; - private final SpanContext context; + private final com.datadoghq.trace.impl.SpanContext context; Span( Tracer tracer, String operationName, Map tags, Optional timestamp, - SpanContext context) { + com.datadoghq.trace.impl.SpanContext context) { this.tracer = tracer; this.operationName = operationName; this.tags = tags; @@ -104,4 +104,8 @@ public class Span implements io.opentracing.Span { return startTime; } + public com.datadoghq.trace.impl.SpanContext getContext(){ + return context; + } + } diff --git a/src/main/java/com/datadoghq/trace/impl/Tracer.java b/src/main/java/com/datadoghq/trace/impl/Tracer.java index 2734041a72..a8664a39d1 100644 --- a/src/main/java/com/datadoghq/trace/impl/Tracer.java +++ b/src/main/java/com/datadoghq/trace/impl/Tracer.java @@ -27,16 +27,16 @@ public class Tracer implements io.opentracing.Tracer { class SpanBuilder implements io.opentracing.Tracer.SpanBuilder { private final String operationName; - private Map tags = new HashMap(); + private Map tags = new HashMap(); private Long timestamp; - private Optional parent = Optional.empty(); + private Optional parent = Optional.empty(); public SpanBuilder(String operationName) { this.operationName = operationName; } public io.opentracing.Tracer.SpanBuilder asChildOf(SpanContext spanContext) { - this.parent = Optional.ofNullable(spanContext); + this.parent = Optional.ofNullable((com.datadoghq.trace.impl.SpanContext)spanContext); return this; } @@ -75,7 +75,7 @@ public class Tracer implements io.opentracing.Tracer { public Span start() { // build the context - SpanContext context = buildTheSpanContext(); + com.datadoghq.trace.impl.SpanContext context = buildTheSpanContext(); return new com.datadoghq.trace.impl.Span( Tracer.this, @@ -85,13 +85,13 @@ public class Tracer implements io.opentracing.Tracer { context); } - private SpanContext buildTheSpanContext() { + private com.datadoghq.trace.impl.SpanContext buildTheSpanContext() { - SpanContext context = null; + com.datadoghq.trace.impl.SpanContext context = null; long generatedId = generateNewId(); if (parent.isPresent()) { - com.datadoghq.trace.impl.SpanContext p = (com.datadoghq.trace.impl.SpanContext) parent.get(); + com.datadoghq.trace.impl.SpanContext p = parent.get(); context = new com.datadoghq.trace.impl.SpanContext( p.getTraceId(), generatedId, diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java new file mode 100644 index 0000000000..5e407e2761 --- /dev/null +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDAgentWriter.java @@ -0,0 +1,73 @@ +package com.datadoghq.trace.writer.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import com.datadoghq.trace.Writer; + +import io.opentracing.Span; + +public class DDAgentWriter implements Writer { + + protected static final String DEFAULT_HOSTNAME = "localhost"; + protected static final int DEFAULT_PORT = 8126; + + protected static final int DEFAULT_MAX_TRACES = 1000; + protected static final int DEFAULT_BATCH_SIZE = 10; + protected static final int DEFAULT_MAX_SERVICES = 1000; + protected static final long DEFAULT_TIMEOUT = 5000; + + protected final BlockingQueue commandQueue; + protected final Thread asyncWriterThread; + + public DDAgentWriter() { + super(); + commandQueue = new ArrayBlockingQueue(DEFAULT_MAX_TRACES); + + asyncWriterThread = new Thread(new SpansSendingTask(), "dd.DDAgentWriter-SpansSendingTask"); + asyncWriterThread.setDaemon(true); + asyncWriterThread.start(); + } + + public void write(Span span) { + try{ + //Try to add a new span in the queue + commandQueue.add(span); + }catch(IllegalStateException e){ + //It was not possible to add the span the queue is full! + //FIXME proper logging + System.out.println("Cannot add the following span as the async queue is full: "+span); + } + } + + public void close() { + asyncWriterThread.interrupt(); + } + + protected class SpansSendingTask implements Runnable { + public void run() { + while (true) { + try { + //Wait until a new span comes + Span span = commandQueue.take(); + + //Drain all spans up to a certain batch suze + List spans = new ArrayList(); + spans.add(span); + commandQueue.drainTo(spans, DEFAULT_BATCH_SIZE); + + //Then write to the agent + } catch (InterruptedException e) { + // TODO Auto-generated catch block + // FIXME proper logging + e.printStackTrace(); + + //The thread was interrupted, we break the LOOP + break; + } + } + } + } +} diff --git a/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java b/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java new file mode 100644 index 0000000000..8c6e5c45c2 --- /dev/null +++ b/src/main/java/com/datadoghq/trace/writer/impl/DDApi.java @@ -0,0 +1,59 @@ +package com.datadoghq.trace.writer.impl; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.List; + +import io.opentracing.Span; + +public class DDApi { + + protected static final String TRACES_ENDPOINT = "/v0.3/traces"; + protected static final String TRACES_SERVICES = "/v0.3/services"; + + protected final String host; + protected final int port; + protected final String tracesEndpoint; + protected final String servicesEndpoint; + + public DDApi(String host, int port) { + super(); + this.host = host; + this.port = port; + this.tracesEndpoint = "http://"+host+":"+port+TRACES_ENDPOINT; + this.servicesEndpoint = "http://"+host+":"+port+TRACES_SERVICES; + } + + public void sendSpans(List spans){ + + } + + public void sendServices(List services){ + + } + + private int callPUT(String endpoint,String content){ + try { + URL url = new URL(tracesEndpoint); + HttpURLConnection httpCon = (HttpURLConnection) url.openConnection(); + httpCon.setDoOutput(true); + httpCon.setRequestMethod("PUT"); + httpCon.setRequestProperty("Content-Type", "application/json"); + OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream()); + out.write("Resource content"); + out.close(); + return httpCon.getResponseCode(); + } catch (MalformedURLException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return -1; + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + return -1; + } + } +}