This commit is contained in:
Guillaume Polaert 2017-08-08 09:19:18 +02:00
parent a224fd0829
commit 0d79f0cf08
4 changed files with 32 additions and 58 deletions

View File

@ -48,7 +48,9 @@ public class Service {
public enum AppType {
WEB("web"),
DB("db"),
CUSTOM("custom");
CUSTOM("custom"),
CACHE("cache"),
WORKER("worker");
private final String type;

View File

@ -47,9 +47,6 @@ public class DDAgentWriter implements Writer {
/** Effective thread pool, where real logic is done */
private final ExecutorService executor = Executors.newSingleThreadExecutor();
/** In memory collection of services waiting for departure */
private final BlockingQueue<Map<String, Service>> services =
new ArrayBlockingQueue<>(MAX_QUEUE_SIZE);
/** Async worker that posts the spans to the DD agent */
private final ExecutorService executor = Executors.newFixedThreadPool(2);
@ -96,11 +93,20 @@ public class DDAgentWriter implements Writer {
@Override
public void writeServices(final Map<String, Service> services) {
if (!this.services.offer(services)) {
log.warn(
"Cannot add a service list to the async queue, queue is full. Queue max size: {}",
MAX_QUEUE_SIZE);
}
final Runnable task =
new Runnable() {
@Override
public void run() {
//SEND the payload to the agent
log.debug("Async writer about to write {} services", services.size());
if (api.sendServices(services)) {
log.debug("Async writer just sent {} services", services.size());
} else {
log.warn("Failed for Async writer to send {} services", services.size());
}
}
};
executor.submit(task);
}
/* (non-Javadoc)
@ -108,8 +114,6 @@ public class DDAgentWriter implements Writer {
*/
@Override
public void start() {
executor.submit(new SpansSendingTask());
executor.submit(new ServicesSendingTask());
scheduledExecutor.scheduleAtFixedRate(
new TracesSendingTask(), 0, FLUSH_TIME_SECONDS, TimeUnit.SECONDS);
}
@ -178,35 +182,4 @@ public class DDAgentWriter implements Writer {
}
}
}
/** Infinite tasks blocking until some spans come in the blocking queue. */
protected class ServicesSendingTask implements Runnable {
@Override
public void run() {
while (true) {
try {
//WAIT until a new service comes
final Map<String, Service> payload = DDAgentWriter.this.services.take();
//SEND the payload to the agent
log.debug("Async writer about to write {} services", payload.size());
if (api.sendServices(payload)) {
log.debug("Async writer just sent {} services", payload.size());
} else {
log.warn("Failed for Async writer to send {} services", payload.size());
}
} catch (final InterruptedException e) {
log.info("Async writer (services) interrupted.");
//The thread was interrupted, we break the LOOP
break;
} catch (final Throwable e) {
log.error("Unexpected error! Some services may have been dropped.", e);
}
}
}
}
}

View File

@ -13,11 +13,11 @@ class ServiceTest extends Specification {
def "getter and setter"() {
setup:
def service = new Service("service-name", "app-name", Service.AppType.CUSTOM)
def service = new Service("api-intake", "kafka", Service.AppType.CUSTOM)
expect:
service.getName() == "service-name"
service.getAppName() == "app-name"
service.getName() == "api-intake"
service.getAppName() == "kafka"
service.getAppType() == Service.AppType.CUSTOM
}
@ -25,10 +25,12 @@ class ServiceTest extends Specification {
def "enum"() {
expect:
Service.AppType.values().size() == 3
Service.AppType.values().size() == 5
Service.AppType.DB.toString() == "db"
Service.AppType.WEB.toString() == "web"
Service.AppType.CUSTOM.toString() == "custom"
Service.AppType.WORKER.toString() == "worker"
Service.AppType.CACHE.toString() == "cache"
}
@ -36,14 +38,14 @@ class ServiceTest extends Specification {
setup:
def tracer = new DDTracer()
def service = new Service("service-name", "app-name", Service.AppType.CUSTOM)
def service = new Service("api-intake", "kafka", Service.AppType.CUSTOM)
when:
tracer.addServiceInfo(service)
then:
tracer.getServiceInfo().size() == 1
tracer.getServiceInfo().get("service-name") == service
tracer.getServiceInfo().get("api-intake") == service
}
@ -55,7 +57,7 @@ class ServiceTest extends Specification {
when:
tracer.addServiceInfo(new Service("service-name", "app-name", Service.AppType.CUSTOM))
tracer.addServiceInfo(new Service("api-intake", "kafka", Service.AppType.CUSTOM))
then:
verify(writer, times(1)).writeServices(any(Map))

View File

@ -9,24 +9,20 @@ public class ExampleWithLoggingWriter {
public static void main(final String[] args) throws Exception {
final DDTracer tracer = new DDTracer(new LoggingWriter(), new AllSampler());
tracer.addServiceInfo(new Service("service-foo", "mongo", Service.AppType.WEB));
tracer.addServiceInfo(new Service("api-intake", "spark", Service.AppType.CACHE));
final Span parent =
tracer
.buildSpan("hello-world")
.withServiceName("service-foo")
.withSpanType("web")
.startManual();
tracer.buildSpan("fetch.backend").withServiceName("api-intake").startManual();
parent.setBaggageItem("a-baggage", "value");
parent.setBaggageItem("scope-id", "a-1337");
Thread.sleep(100);
final Span child =
tracer
.buildSpan("hello-world")
.buildSpan("delete.resource")
.asChildOf(parent)
.withResourceName("resource-name")
.withResourceName("delete")
.startManual();
Thread.sleep(100);
@ -36,5 +32,6 @@ public class ExampleWithLoggingWriter {
Thread.sleep(100);
parent.finish();
tracer.close();
}
}