Logging + comments

This commit is contained in:
renaudboutet 2017-04-27 15:22:29 +02:00
parent cf5f2541ca
commit 348143967a
8 changed files with 173 additions and 68 deletions

View File

@ -22,5 +22,10 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="target/classes"/>
</classpath>

View File

@ -2,12 +2,36 @@ package com.datadoghq.trace;
import io.opentracing.Span;
/**
* Main interface to serialize/deserialize spans or collection of spans.
*/
public interface SpanSerializer {
public String serialize(Span t) throws Exception;
/**
* Serialize a single span
*
* @param the span to serialize
* @return the serialized object
* @throws Exception
*/
public String serialize(Span span) throws Exception;
/**
* A collection of Span to serialize
*
* @param spans List or List of list of Spans
* @return the serialized objects
* @throws Exception
*/
public String serialize(Object spans) throws Exception;
public Span deserialize(String str) throws Exception;
/**
* Deserialize a string to convert it in a Span or a Trace
*
* @param str the string to deserialize
* @return A Span or a Trace (List<Span>)
* @throws Exception
*/
public Object deserialize(String str) throws Exception;
}

View File

@ -4,14 +4,20 @@ import java.util.List;
import io.opentracing.Span;
/**
* A writer is responsible to send collected spans to some place
*/
public interface Writer {
/**
* Write a trace represented by the entire list of all the finished spans
*
* @param trace
* @param trace the list of spans to write
*/
public void write(List<Span> trace);
/**
* Indicates to the writer that no future writing will come and it should terminates all connections and tasks
*/
public void close();
}

View File

@ -9,18 +9,30 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentracing.Span;
/**
* Main DDSpanSerializer: convert spans and traces to proper JSON
*/
public class DDSpanSerializer implements SpanSerializer {
protected final ObjectMapper objectMapper = new ObjectMapper();
/* (non-Javadoc)
* @see com.datadoghq.trace.SpanSerializer#serialize(io.opentracing.Span)
*/
public String serialize(Span span) throws JsonProcessingException {
return objectMapper.writeValueAsString(span);
}
/* (non-Javadoc)
* @see com.datadoghq.trace.SpanSerializer#serialize(java.lang.Object)
*/
public String serialize(Object spans) throws JsonProcessingException {
return objectMapper.writeValueAsString(spans);
}
/* (non-Javadoc)
* @see com.datadoghq.trace.SpanSerializer#deserialize(java.lang.String)
*/
public io.opentracing.Span deserialize(String str) throws Exception {
throw new UnsupportedOperationException("Deserialisation of spans is not implemented yet");
}

View File

@ -164,6 +164,6 @@ public class DDTracer implements io.opentracing.Tracer {
}
long generateNewId() {
return System.nanoTime();
return Math.abs(UUID.randomUUID().getMostSignificantBits());
}
}

View File

@ -6,24 +6,57 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datadoghq.trace.Writer;
import io.opentracing.Span;
/**
* This writer write provided traces to the a DD agent which is most of time located on the same host.
*
* It handles writes asynchronuously so the calling threads are automatically released. However, if too much spans are collected
* the writers can reach a state where it is forced to drop incoming spans.
*/
public class DDAgentWriter implements Writer {
protected static final Logger logger = LoggerFactory.getLogger(DDAgentWriter.class.getName());
/**
* Default location of the DD agent
*/
protected static final String DEFAULT_HOSTNAME = "localhost";
protected static final int DEFAULT_PORT = 8126;
/**
* Maximum number of spans kept in memory
*/
protected static final int DEFAULT_MAX_SPANS = 1000;
/**
* Maximum number of traces sent to the DD agent API at once
*/
protected static final int DEFAULT_BATCH_SIZE = 10;
protected static final int DEFAULT_MAX_SERVICES = 1000;
protected static final long DEFAULT_TIMEOUT = 5000;
/**
* Used to ensure that we don't keep too many spans (while the blocking queue collect traces...)
*/
private final Semaphore tokens;
/**
* In memory collection of traces waiting for departure
*/
protected final BlockingQueue<List<Span>> traces;
/**
* Async worker that posts the spans to the DD agent
*/
protected final Thread asyncWriterThread;
/**
* The DD agent api
*/
protected final DDApi api;
public DDAgentWriter() {
@ -38,6 +71,9 @@ public class DDAgentWriter implements Writer {
asyncWriterThread.start();
}
/* (non-Javadoc)
* @see com.datadoghq.trace.Writer#write(java.util.List)
*/
public void write(List<Span> trace) {
//Try to add a new span in the queue
boolean proceed = tokens.tryAcquire(trace.size());
@ -45,21 +81,25 @@ public class DDAgentWriter implements Writer {
if(proceed){
traces.add(trace);
}else{
//It was not possible to add the span the queue is full!
//FIXME proper logging
System.out.println("Cannot add the following trace as the async queue is full: "+trace);
logger.warn("Cannot add a trace of "+trace.size()+" as the async queue is full. Queue max size:"+DEFAULT_MAX_SPANS);
}
}
/* (non-Javadoc)
* @see com.datadoghq.trace.Writer#close()
*/
public void close() {
asyncWriterThread.interrupt();
try {
asyncWriterThread.join();
} catch (InterruptedException e) {
//Nothing to log as we expect and interrupted exception
logger.info("Writer properly closed and async writer interrupted.");
}
}
/**
* Infinite tasks blocking until some spans come in the blocking queue.
*/
protected class SpansSendingTask implements Runnable {
protected final List<List<Span>> payload = new ArrayList<List<Span>>();
@ -70,13 +110,11 @@ public class DDAgentWriter implements Writer {
//WAIT until a new span comes
payload.add(traces.take());
//FIXME proper logging
System.out.println("Start writing traces");
//Drain all spans up to a certain batch suze
traces.drainTo(payload, DEFAULT_BATCH_SIZE);
//SEND the payload to the agent
logger.debug("Async writer about to write "+payload.size()+" traces.");
api.sendTraces(payload);
//Compute the number of spans sent
@ -84,19 +122,15 @@ public class DDAgentWriter implements Writer {
for(List<Span> trace:payload){
spansCount+=trace.size();
}
//FIXME proper logging
System.out.println("Sent "+spansCount+" spans through "+payload.size()+" traces");
logger.debug("Async writer just sent "+spansCount+" spans through "+payload.size()+" traces");
//Force garbage collect of the payload
payload.clear();
//Release the tokens
tokens.release(spansCount);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
// FIXME proper logging
e.printStackTrace();
logger.info("Async writer interrupted.");
//The thread was interrupted, we break the LOOP
break;

View File

@ -1,21 +1,27 @@
package com.datadoghq.trace.writer.impl;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datadoghq.trace.SpanSerializer;
import com.datadoghq.trace.impl.DDSpanSerializer;
import com.datadoghq.trace.impl.DDTracer;
import io.opentracing.Span;
/**
* The API pointing to a DD agent
*/
public class DDApi {
protected static final Logger logger = LoggerFactory.getLogger(DDApi.class.getName());
protected static final String TRACES_ENDPOINT = "/v0.3/traces";
protected static final String SERVICES_ENDPOINT = "/v0.3/services";
@ -23,6 +29,10 @@ public class DDApi {
protected final int port;
protected final String tracesEndpoint;
protected final String servicesEndpoint;
/**
* The spans serializer: can be replaced. By default, it serialize in JSON.
*/
protected final SpanSerializer spanSerializer;
public DDApi(String host, int port) {
@ -38,49 +48,64 @@ public class DDApi {
this.spanSerializer = spanSerializer;
}
/**
* Send traces to the DD agent
*
* @param traces the traces to be sent
* @return the staus code returned
*/
public boolean sendTraces(List<List<Span>> traces){
String payload = null;
try {
String payload = spanSerializer.serialize(traces);
int status = callPUT(tracesEndpoint,payload);
if(status == 200){
return true;
}else{
//FIXME log status here
return false;
}
payload = spanSerializer.serialize(traces);
} catch (Exception e) {
//FIXME proper exceptino
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("Error during serialization of "+traces.size()+" traces.",e);
return false;
}
int status = callPUT(tracesEndpoint,payload);
if(status == 200){
logger.debug("Succesfully sent "+traces.size()+" traces to the DD agent.");
return true;
}else{
logger.warn("Error while sending "+traces.size()+" traces to the DD agent. Status: "+status);
return false;
}
}
public boolean sendServices(List<String> services){
return false;
}
/**
* PUT to an endpoint the provided JSON content
*
* @param endpoint
* @param content
* @return the status code
*/
private int callPUT(String endpoint,String content){
HttpURLConnection httpCon = null;
try {
try{
URL url = new URL(endpoint);
httpCon = (HttpURLConnection) url.openConnection();
httpCon.setDoOutput(true);
httpCon.setRequestMethod("PUT");
httpCon.setRequestProperty("Content-Type", "application/json");
} catch (Exception e) {
logger.warn("Error thrown before PUT call to the DD agent.",e);
return -1;
}
try{
OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream());
out.write(content);
out.close();
return httpCon.getResponseCode();
} catch (MalformedURLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return -1;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
int responseCode = httpCon.getResponseCode();
if(responseCode != 200){
logger.debug("Sent the payload to the DD agent.");
}else{
logger.warn("Could not send the payload to the DD agent. Status: "+httpCon.getResponseCode()+" ResponseMessage: "+httpCon.getResponseMessage());
}
return responseCode;
} catch (Exception e) {
logger.warn("Could not send the payload to the DD agent.",e);
return -1;
}
}
@ -118,7 +143,7 @@ public class DDApi {
writer.write(array);
Thread.sleep(1000);
writer.close();
}

View File

@ -11,31 +11,30 @@
</layout>
</appender>
<appender name="file-json" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log_directory}/dd-tracer-json.log</file>
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>
${log_directory}/dd-tracer-json.%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>10MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>
<!-- <appender name="file-json" class="ch.qos.logback.core.rolling.RollingFileAppender"> -->
<!-- <file>${log_directory}/dd-tracer-json.log</file> -->
<!-- <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> -->
<!-- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> -->
<!-- <fileNamePattern> -->
<!-- ${log_directory}/dd-tracer-json.%d{yyyy-MM-dd}.%i.log -->
<!-- </fileNamePattern> -->
<!-- <timeBasedFileNamingAndTriggeringPolicy -->
<!-- class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> -->
<!-- <maxFileSize>10MB</maxFileSize> -->
<!-- </timeBasedFileNamingAndTriggeringPolicy> -->
<!-- </rollingPolicy> -->
<!-- </appender> -->
<appender name="console-json" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<logger name="com.datadoghq.trace" level="trace">
<appender-ref ref="console-json"/>
<!-- <logger name="com.datadoghq.trace" level="info">
<appender-ref ref="file-json"/>
</logger>
</logger> -->
<root level="info">
<appender-ref ref="console-json"/>
<root level="trace">
<appender-ref ref="console"/>
</root>
</configuration>