Merge remote-tracking branch 'origin/dev' into dev
# Conflicts: # src/main/java/com/datadoghq/trace/impl/DDSpan.java # src/main/java/com/datadoghq/trace/impl/DDSpanContext.java # src/main/java/com/datadoghq/trace/impl/DDTracer.java
This commit is contained in:
commit
5846f45f4f
|
@ -22,5 +22,10 @@
|
|||
<attribute name="maven.pomderived" value="true"/>
|
||||
</attributes>
|
||||
</classpathentry>
|
||||
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
|
||||
<attributes>
|
||||
<attribute name="maven.pomderived" value="true"/>
|
||||
</attributes>
|
||||
</classpathentry>
|
||||
<classpathentry kind="output" path="target/classes"/>
|
||||
</classpath>
|
||||
|
|
|
@ -2,12 +2,36 @@ package com.datadoghq.trace;
|
|||
|
||||
import io.opentracing.Span;
|
||||
|
||||
/**
|
||||
* Main interface to serialize/deserialize spans or collection of spans.
|
||||
*/
|
||||
public interface SpanSerializer {
|
||||
|
||||
public String serialize(Span t) throws Exception;
|
||||
/**
|
||||
* Serialize a single span
|
||||
*
|
||||
* @param the span to serialize
|
||||
* @return the serialized object
|
||||
* @throws Exception
|
||||
*/
|
||||
public String serialize(Span span) throws Exception;
|
||||
|
||||
/**
|
||||
* A collection of Span to serialize
|
||||
*
|
||||
* @param spans List or List of list of Spans
|
||||
* @return the serialized objects
|
||||
* @throws Exception
|
||||
*/
|
||||
public String serialize(Object spans) throws Exception;
|
||||
|
||||
public Span deserialize(String str) throws Exception;
|
||||
/**
|
||||
* Deserialize a string to convert it in a Span or a Trace
|
||||
*
|
||||
* @param str the string to deserialize
|
||||
* @return A Span or a Trace (List<Span>)
|
||||
* @throws Exception
|
||||
*/
|
||||
public Object deserialize(String str) throws Exception;
|
||||
|
||||
}
|
||||
|
|
|
@ -4,14 +4,20 @@ import java.util.List;
|
|||
|
||||
import io.opentracing.Span;
|
||||
|
||||
/**
|
||||
* A writer is responsible to send collected spans to some place
|
||||
*/
|
||||
public interface Writer {
|
||||
|
||||
/**
|
||||
* Write a trace represented by the entire list of all the finished spans
|
||||
*
|
||||
* @param trace
|
||||
* @param trace the list of spans to write
|
||||
*/
|
||||
public void write(List<Span> trace);
|
||||
|
||||
/**
|
||||
* Indicates to the writer that no future writing will come and it should terminates all connections and tasks
|
||||
*/
|
||||
public void close();
|
||||
}
|
||||
|
|
|
@ -22,13 +22,18 @@ public class DDSpan implements io.opentracing.Span {
|
|||
private final static Logger logger = LoggerFactory.getLogger(DDSpan.class);
|
||||
|
||||
DDSpan(
|
||||
DDTracer tracer,
|
||||
String operationName,
|
||||
List<Span> trace,
|
||||
Map<String, Object> tags,
|
||||
Long timestampMilliseconds,
|
||||
DDSpanContext context) {
|
||||
|
||||
this.tracer = tracer;
|
||||
this.operationName = operationName;
|
||||
this.trace = Optional.ofNullable(trace).orElse(new ArrayList<>());
|
||||
this.tags = tags;
|
||||
this.startTimeNano = Optional.ofNullable(timestampMilliseconds).orElse(Clock.systemUTC().millis()) * 1000000L;
|
||||
this.context = context;
|
||||
|
||||
// record the start time in nano (current milli + nano delta)
|
||||
|
@ -49,6 +54,8 @@ public class DDSpan implements io.opentracing.Span {
|
|||
|
||||
logger.debug("Starting a new span. " + this.toString());
|
||||
|
||||
public void finish() {
|
||||
finish(Clock.systemUTC().millis());
|
||||
}
|
||||
|
||||
public void finish(long stopTimeMillis) {
|
||||
|
@ -73,7 +80,6 @@ public class DDSpan implements io.opentracing.Span {
|
|||
logger.debug("Sending the trace to the writer");
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void finish() {
|
||||
|
@ -227,4 +233,34 @@ public class DDSpan implements io.opentracing.Span {
|
|||
result = 31 * result + (context != null ? context.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return context.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
result = prime * result + ((context == null) ? 0 : context.hashCode());
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
DDSpan other = (DDSpan) obj;
|
||||
if (context == null) {
|
||||
if (other.context != null)
|
||||
return false;
|
||||
} else if (!context.equals(other.context))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,18 +9,30 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
|
||||
import io.opentracing.Span;
|
||||
|
||||
/**
|
||||
* Main DDSpanSerializer: convert spans and traces to proper JSON
|
||||
*/
|
||||
public class DDSpanSerializer implements SpanSerializer {
|
||||
|
||||
protected final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see com.datadoghq.trace.SpanSerializer#serialize(io.opentracing.Span)
|
||||
*/
|
||||
public String serialize(Span span) throws JsonProcessingException {
|
||||
return objectMapper.writeValueAsString(span);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see com.datadoghq.trace.SpanSerializer#serialize(java.lang.Object)
|
||||
*/
|
||||
public String serialize(Object spans) throws JsonProcessingException {
|
||||
return objectMapper.writeValueAsString(spans);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see com.datadoghq.trace.SpanSerializer#deserialize(java.lang.String)
|
||||
*/
|
||||
public io.opentracing.Span deserialize(String str) throws Exception {
|
||||
throw new UnsupportedOperationException("Deserialisation of spans is not implemented yet");
|
||||
}
|
||||
|
|
|
@ -6,24 +6,57 @@ import java.util.concurrent.ArrayBlockingQueue;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.datadoghq.trace.Writer;
|
||||
|
||||
import io.opentracing.Span;
|
||||
|
||||
/**
|
||||
* This writer write provided traces to the a DD agent which is most of time located on the same host.
|
||||
*
|
||||
* It handles writes asynchronuously so the calling threads are automatically released. However, if too much spans are collected
|
||||
* the writers can reach a state where it is forced to drop incoming spans.
|
||||
*/
|
||||
public class DDAgentWriter implements Writer {
|
||||
|
||||
protected static final Logger logger = LoggerFactory.getLogger(DDAgentWriter.class.getName());
|
||||
|
||||
/**
|
||||
* Default location of the DD agent
|
||||
*/
|
||||
protected static final String DEFAULT_HOSTNAME = "localhost";
|
||||
protected static final int DEFAULT_PORT = 8126;
|
||||
|
||||
|
||||
/**
|
||||
* Maximum number of spans kept in memory
|
||||
*/
|
||||
protected static final int DEFAULT_MAX_SPANS = 1000;
|
||||
|
||||
/**
|
||||
* Maximum number of traces sent to the DD agent API at once
|
||||
*/
|
||||
protected static final int DEFAULT_BATCH_SIZE = 10;
|
||||
protected static final int DEFAULT_MAX_SERVICES = 1000;
|
||||
protected static final long DEFAULT_TIMEOUT = 5000;
|
||||
|
||||
/**
|
||||
* Used to ensure that we don't keep too many spans (while the blocking queue collect traces...)
|
||||
*/
|
||||
private final Semaphore tokens;
|
||||
|
||||
/**
|
||||
* In memory collection of traces waiting for departure
|
||||
*/
|
||||
protected final BlockingQueue<List<Span>> traces;
|
||||
|
||||
/**
|
||||
* Async worker that posts the spans to the DD agent
|
||||
*/
|
||||
protected final Thread asyncWriterThread;
|
||||
|
||||
/**
|
||||
* The DD agent api
|
||||
*/
|
||||
protected final DDApi api;
|
||||
|
||||
public DDAgentWriter() {
|
||||
|
@ -38,6 +71,9 @@ public class DDAgentWriter implements Writer {
|
|||
asyncWriterThread.start();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see com.datadoghq.trace.Writer#write(java.util.List)
|
||||
*/
|
||||
public void write(List<Span> trace) {
|
||||
//Try to add a new span in the queue
|
||||
boolean proceed = tokens.tryAcquire(trace.size());
|
||||
|
@ -45,21 +81,25 @@ public class DDAgentWriter implements Writer {
|
|||
if(proceed){
|
||||
traces.add(trace);
|
||||
}else{
|
||||
//It was not possible to add the span the queue is full!
|
||||
//FIXME proper logging
|
||||
System.out.println("Cannot add the following trace as the async queue is full: "+trace);
|
||||
logger.warn("Cannot add a trace of "+trace.size()+" as the async queue is full. Queue max size:"+DEFAULT_MAX_SPANS);
|
||||
}
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see com.datadoghq.trace.Writer#close()
|
||||
*/
|
||||
public void close() {
|
||||
asyncWriterThread.interrupt();
|
||||
try {
|
||||
asyncWriterThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
//Nothing to log as we expect and interrupted exception
|
||||
logger.info("Writer properly closed and async writer interrupted.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Infinite tasks blocking until some spans come in the blocking queue.
|
||||
*/
|
||||
protected class SpansSendingTask implements Runnable {
|
||||
|
||||
protected final List<List<Span>> payload = new ArrayList<List<Span>>();
|
||||
|
@ -70,13 +110,11 @@ public class DDAgentWriter implements Writer {
|
|||
//WAIT until a new span comes
|
||||
payload.add(traces.take());
|
||||
|
||||
//FIXME proper logging
|
||||
System.out.println("Start writing traces");
|
||||
|
||||
//Drain all spans up to a certain batch suze
|
||||
traces.drainTo(payload, DEFAULT_BATCH_SIZE);
|
||||
|
||||
//SEND the payload to the agent
|
||||
logger.debug("Async writer about to write "+payload.size()+" traces.");
|
||||
api.sendTraces(payload);
|
||||
|
||||
//Compute the number of spans sent
|
||||
|
@ -84,19 +122,15 @@ public class DDAgentWriter implements Writer {
|
|||
for(List<Span> trace:payload){
|
||||
spansCount+=trace.size();
|
||||
}
|
||||
|
||||
//FIXME proper logging
|
||||
System.out.println("Sent "+spansCount+" spans through "+payload.size()+" traces");
|
||||
|
||||
logger.debug("Async writer just sent "+spansCount+" spans through "+payload.size()+" traces");
|
||||
|
||||
//Force garbage collect of the payload
|
||||
payload.clear();
|
||||
|
||||
//Release the tokens
|
||||
tokens.release(spansCount);
|
||||
} catch (InterruptedException e) {
|
||||
// TODO Auto-generated catch block
|
||||
// FIXME proper logging
|
||||
e.printStackTrace();
|
||||
logger.info("Async writer interrupted.");
|
||||
|
||||
//The thread was interrupted, we break the LOOP
|
||||
break;
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
package com.datadoghq.trace.writer.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.datadoghq.trace.SpanSerializer;
|
||||
import com.datadoghq.trace.impl.DDSpan;
|
||||
import com.datadoghq.trace.impl.DDSpanSerializer;
|
||||
|
@ -15,8 +16,13 @@ import com.datadoghq.trace.impl.DDTracer;
|
|||
|
||||
import io.opentracing.Span;
|
||||
|
||||
/**
|
||||
* The API pointing to a DD agent
|
||||
*/
|
||||
public class DDApi {
|
||||
|
||||
protected static final Logger logger = LoggerFactory.getLogger(DDApi.class.getName());
|
||||
|
||||
protected static final String TRACES_ENDPOINT = "/v0.3/traces";
|
||||
protected static final String SERVICES_ENDPOINT = "/v0.3/services";
|
||||
|
||||
|
@ -24,6 +30,10 @@ public class DDApi {
|
|||
protected final int port;
|
||||
protected final String tracesEndpoint;
|
||||
protected final String servicesEndpoint;
|
||||
|
||||
/**
|
||||
* The spans serializer: can be replaced. By default, it serialize in JSON.
|
||||
*/
|
||||
protected final SpanSerializer spanSerializer;
|
||||
|
||||
public DDApi(String host, int port) {
|
||||
|
@ -39,49 +49,64 @@ public class DDApi {
|
|||
this.spanSerializer = spanSerializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send traces to the DD agent
|
||||
*
|
||||
* @param traces the traces to be sent
|
||||
* @return the staus code returned
|
||||
*/
|
||||
public boolean sendTraces(List<List<Span>> traces){
|
||||
String payload = null;
|
||||
try {
|
||||
String payload = spanSerializer.serialize(traces);
|
||||
int status = callPUT(tracesEndpoint,payload);
|
||||
if(status == 200){
|
||||
return true;
|
||||
}else{
|
||||
|
||||
//FIXME log status here
|
||||
|
||||
return false;
|
||||
}
|
||||
payload = spanSerializer.serialize(traces);
|
||||
} catch (Exception e) {
|
||||
//FIXME proper exceptino
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
logger.error("Error during serialization of "+traces.size()+" traces.",e);
|
||||
return false;
|
||||
}
|
||||
|
||||
int status = callPUT(tracesEndpoint,payload);
|
||||
if(status == 200){
|
||||
logger.debug("Succesfully sent "+traces.size()+" traces to the DD agent.");
|
||||
return true;
|
||||
}else{
|
||||
logger.warn("Error while sending "+traces.size()+" traces to the DD agent. Status: "+status);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean sendServices(List<String> services){
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* PUT to an endpoint the provided JSON content
|
||||
*
|
||||
* @param endpoint
|
||||
* @param content
|
||||
* @return the status code
|
||||
*/
|
||||
private int callPUT(String endpoint,String content){
|
||||
HttpURLConnection httpCon = null;
|
||||
try {
|
||||
try{
|
||||
URL url = new URL(endpoint);
|
||||
httpCon = (HttpURLConnection) url.openConnection();
|
||||
httpCon.setDoOutput(true);
|
||||
httpCon.setRequestMethod("PUT");
|
||||
httpCon.setRequestProperty("Content-Type", "application/json");
|
||||
} catch (Exception e) {
|
||||
logger.warn("Error thrown before PUT call to the DD agent.",e);
|
||||
return -1;
|
||||
}
|
||||
|
||||
try{
|
||||
OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream());
|
||||
out.write(content);
|
||||
out.close();
|
||||
return httpCon.getResponseCode();
|
||||
} catch (MalformedURLException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
return -1;
|
||||
} catch (IOException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
int responseCode = httpCon.getResponseCode();
|
||||
if(responseCode != 200){
|
||||
logger.debug("Sent the payload to the DD agent.");
|
||||
}else{
|
||||
logger.warn("Could not send the payload to the DD agent. Status: "+httpCon.getResponseCode()+" ResponseMessage: "+httpCon.getResponseMessage());
|
||||
}
|
||||
return responseCode;
|
||||
} catch (Exception e) {
|
||||
logger.warn("Could not send the payload to the DD agent.",e);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -119,7 +144,7 @@ public class DDApi {
|
|||
writer.write(((DDSpan)parent.getTrace());
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
|
||||
writer.close();
|
||||
|
||||
}
|
||||
|
|
|
@ -11,31 +11,30 @@
|
|||
</layout>
|
||||
</appender>
|
||||
|
||||
<appender name="file-json" class="ch.qos.logback.core.rolling.RollingFileAppender">
|
||||
<file>${log_directory}/dd-tracer-json.log</file>
|
||||
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
|
||||
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
|
||||
<fileNamePattern>
|
||||
${log_directory}/dd-tracer-json.%d{yyyy-MM-dd}.%i.log
|
||||
</fileNamePattern>
|
||||
<timeBasedFileNamingAndTriggeringPolicy
|
||||
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
|
||||
<maxFileSize>10MB</maxFileSize>
|
||||
</timeBasedFileNamingAndTriggeringPolicy>
|
||||
</rollingPolicy>
|
||||
</appender>
|
||||
<!-- <appender name="file-json" class="ch.qos.logback.core.rolling.RollingFileAppender"> -->
|
||||
<!-- <file>${log_directory}/dd-tracer-json.log</file> -->
|
||||
<!-- <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> -->
|
||||
<!-- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> -->
|
||||
<!-- <fileNamePattern> -->
|
||||
<!-- ${log_directory}/dd-tracer-json.%d{yyyy-MM-dd}.%i.log -->
|
||||
<!-- </fileNamePattern> -->
|
||||
<!-- <timeBasedFileNamingAndTriggeringPolicy -->
|
||||
<!-- class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> -->
|
||||
<!-- <maxFileSize>10MB</maxFileSize> -->
|
||||
<!-- </timeBasedFileNamingAndTriggeringPolicy> -->
|
||||
<!-- </rollingPolicy> -->
|
||||
<!-- </appender> -->
|
||||
|
||||
<appender name="console-json" class="ch.qos.logback.core.ConsoleAppender">
|
||||
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
|
||||
</appender>
|
||||
|
||||
<logger name="com.datadoghq.trace" level="trace">
|
||||
<appender-ref ref="console-json"/>
|
||||
<!-- <logger name="com.datadoghq.trace" level="info">
|
||||
<appender-ref ref="file-json"/>
|
||||
</logger>
|
||||
</logger> -->
|
||||
|
||||
<root level="info">
|
||||
<appender-ref ref="console-json"/>
|
||||
<root level="trace">
|
||||
<appender-ref ref="console"/>
|
||||
</root>
|
||||
|
||||
</configuration>
|
Loading…
Reference in New Issue