Remove exporter (#25)

This commit is contained in:
Trask Stalnaker 2019-12-02 12:14:44 -08:00 committed by GitHub
parent 662e1792bb
commit 80f7f615e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 20 additions and 2970 deletions

View File

@ -142,44 +142,6 @@ jobs:
environment:
- TEST_TASK: testJava13
agent_integration_tests:
<<: *defaults
docker:
- image: *default_container
- image: datadog/docker-dd-agent
environment:
- DD_APM_ENABLED=true
- DD_BIND_HOST=0.0.0.0
- DD_API_KEY=invalid_key_but_this_is_fine
steps:
- checkout
- attach_workspace:
at: .
- restore_cache:
<<: *cache_keys
- run:
name: Run Trace Agent Tests
command: ./gradlew traceAgentTest --build-cache --parallel --stacktrace --no-daemon --max-workers=8
- run:
name: Collect Reports
when: on_fail
command: .circleci/collect_reports.sh
- store_artifacts:
path: ./reports
- run:
name: Collect Test Results
when: always
command: .circleci/collect_results.sh
- store_test_results:
path: ./results
check:
<<: *defaults
@ -323,13 +285,6 @@ workflows:
tags:
only: /.*/
- agent_integration_tests:
requires:
- build
filters:
tags:
only: /.*/
- check:
requires:
- build
@ -357,7 +312,6 @@ workflows:
- test_zulu11
- test_12
- test_13
- agent_integration_tests
- check
filters:
branches:
@ -378,7 +332,6 @@ workflows:
- test_zulu11
- test_12
- test_13
- agent_integration_tests
- check
filters:
branches:

View File

@ -2,7 +2,6 @@ Component,Origin,License,Copyright
https://github.com/DataDog/dd-trace-java/blob/dev/dd-trace/src/main/java/com/datadoghq/trace/propagation/Codec.java,Uber,MIT,
import,io.opentracing,Apache-2.0,Copyright 2016-2017 The OpenTracing Authors
import,org.slf4j,MIT,Copyright (c) 2004-2017 QOS.ch
import,com.fasterxml.jackson,Apache-2.0,
import,com.google.auto.service.AutoService,Apache-2.0,"Copyright 2013 Google, Inc."
logback.xml,ch.qos.logback,EPL-1.0 OR LGPL-2.1,"Copyright (C) 1999-2015, QOS.ch. All rights reserved."
logback.xml,net.logstash.logback,Apache-2.0,

1 Component Origin License Copyright
2 https://github.com/DataDog/dd-trace-java/blob/dev/dd-trace/src/main/java/com/datadoghq/trace/propagation/Codec.java Uber MIT
3 import io.opentracing Apache-2.0 Copyright 2016-2017 The OpenTracing Authors
4 import org.slf4j MIT Copyright (c) 2004-2017 QOS.ch
import com.fasterxml.jackson Apache-2.0
5 import com.google.auto.service.AutoService Apache-2.0 Copyright 2013 Google, Inc.
6 logback.xml ch.qos.logback EPL-1.0 OR LGPL-2.1 Copyright (C) 1999-2015, QOS.ch. All rights reserved.
7 logback.xml net.logstash.logback Apache-2.0

View File

@ -37,9 +37,6 @@ public final class Constants {
"net.bytebuddy",
// OT contribs for dd trace resolver
"io.opentracing.contrib",
// jackson
"org.msgpack",
"com.fasterxml.jackson",
"org.yaml.snakeyaml",
// disruptor
"com.lmax.disruptor",

View File

@ -7,7 +7,6 @@ server_type=$1
server_package=$2
agent_jars="${@:3}"
server_pid=""
agent_pid=$(lsof -i tcp:8126 | awk '$8 == "TCP" { print $2 }')
if [[ "$server_package" = "" ]] || [[ "$server_type" != "play-zip" && "$server_type" != "jar" ]]; then
echo "usage: ./run-perf-test.sh [play-zip|jar] path-to-server-package path-to-agent1 path-to-agent2..."
echo ""
@ -23,14 +22,6 @@ if [[ "$server_package" = "" ]] || [[ "$server_type" != "play-zip" && "$server_t
exit 1
fi
if [ "$agent_pid" = "" ]; then
echo "discarding traces"
writer_type="LoggingWriter"
else
echo "sending traces to local trace agent: $agent_pid"
writer_type="DDAgentWriter"
fi
if [ -f perf-test-settings.rc ]; then
echo "loading custom settings"
cat ./perf-test-settings.rc
@ -51,7 +42,7 @@ function start_server {
agent_jar="$1"
javaagent_arg=""
if [ "$agent_jar" != "" -a -f "$agent_jar" ]; then
javaagent_arg="-javaagent:$agent_jar -Ddatadog.slf4j.simpleLogger.defaultLogLevel=off -Ddd.writer.type=$writer_type -Ddd.service.name=perf-test-app"
javaagent_arg="-javaagent:$agent_jar -Ddatadog.slf4j.simpleLogger.defaultLogLevel=off -Ddd.service.name=perf-test-app"
fi
if [ "$server_type" = "jar" ]; then
@ -150,13 +141,6 @@ for agent_jar in $agent_jars; do
fi
if [ "$agent_pid" = "" ]; then
agent_start_cpu=0
agent_start_rss=0
else
agent_start_cpu=$(ps -o 'pid,time' | awk "\$1 == $agent_pid { print \$2 }" | awk -F'[:\.]' '{ print ($1 * 3600) + ($2 * 60) + $3 }')
agent_start_rss=$(ps -o 'pid,rss' | awk "\$1 == $agent_pid { print \$2 }")
fi
server_start_cpu=$(ps -o 'pid,time' | awk "\$1 == $server_pid { print \$2 }" | awk -F'[:\.]' '{ print ($1 * 3600) + ($2 * 60) + $3 }')
server_start_rss=$(ps -o 'pid,rss' | awk "\$1 == $server_pid { print \$2 }")
@ -177,17 +161,8 @@ for agent_jar in $agent_jars; do
rm $test_output_file
done
if [ "$agent_pid" = "" ]; then
agent_stop_cpu=0
agent_stop_rss=0
else
agent_stop_cpu=$(ps -o 'pid,time' | awk "\$1 == $agent_pid { print \$2 }" | awk -F'[:\.]' '{ print ($1 * 3600) + ($2 * 60) + $3 }')
agent_stop_rss=$(ps -o 'pid,rss' | awk "\$1 == $agent_pid { print \$2 }")
fi
server_stop_cpu=$(ps -o 'pid,time' | awk "\$1 == $server_pid { print \$2 }" | awk -F'[:\.]' '{ print ($1 * 3600) + ($2 * 60) + $3 }')
let agent_cpu=$agent_stop_cpu-$agent_start_cpu
let agent_rss=$agent_stop_rss-$agent_start_rss
let server_cpu=$server_stop_cpu-$server_start_cpu
server_load_increase_rss=$(echo "scale=2; ( $server_total_rss / $server_total_rss_count ) - $server_start_rss" | bc)
@ -197,7 +172,7 @@ for agent_jar in $agent_jars; do
server_max_rss=$(awk '/.* maximum resident set size/ { print $1 }' $server_output)
rm $server_output
echo "$result_row,$agent_cpu,$server_cpu,$agent_rss,$server_max_rss,$server_start_rss,$server_load_increase_rss" >> $test_csv_file
echo "$result_row,$server_cpu,$server_max_rss,$server_start_rss,$server_load_increase_rss" >> $test_csv_file
echo "----/Testing agent $agent_jar----"
echo ""
done

View File

@ -15,8 +15,6 @@ dependencies {
jmh group: 'org.mongodb', name: 'mongodb-driver-async', version: '3.4.2'
jmh(group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.119') {
exclude(module: 'httpclient')
exclude(module: 'jackson-databind')
exclude(module: 'jackson-dataformat-cbor')
}
jmh group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.6.0'
jmh group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.3'

View File

@ -1,19 +0,0 @@
package datadog.trace.agent
import datadog.opentracing.DDTraceOTInfo
import datadog.trace.api.DDTraceApiInfo
import spock.lang.Specification
class DDInfoTest extends Specification {
def "info accessible from api"() {
expect:
DDTraceApiInfo.VERSION == DDTraceOTInfo.VERSION
DDTraceApiInfo.VERSION != null
DDTraceApiInfo.VERSION != ""
DDTraceApiInfo.VERSION != "unknown"
DDTraceOTInfo.VERSION != null
DDTraceOTInfo.VERSION != ""
DDTraceOTInfo.VERSION != "unknown"
}
}

View File

@ -6,7 +6,6 @@ minimumBranchCoverage = 0.8
// These are tested outside of this module since this module mainly just defines 'API'
excludedClassesCoverage += [
'datadog.trace.api.DDSpanTypes',
'datadog.trace.api.DDTraceApiInfo',
'datadog.trace.api.GlobalTracer*',
'datadog.trace.api.CorrelationIdentifier',
'datadog.trace.api.DDTags',

View File

@ -46,10 +46,6 @@ public class Config {
public static final String TRACE_ENABLED = "trace.enabled";
public static final String INTEGRATIONS_ENABLED = "integrations.enabled";
public static final String WRITER_TYPE = "writer.type";
public static final String AGENT_HOST = "agent.host";
public static final String TRACE_AGENT_PORT = "trace.agent.port";
public static final String AGENT_PORT_LEGACY = "agent.port";
public static final String AGENT_UNIX_DOMAIN_SOCKET = "trace.agent.unix.domain.socket";
public static final String TRACE_RESOLVER_ENABLED = "trace.resolver.enabled";
public static final String GLOBAL_TAGS = "trace.global.tags";
public static final String SPAN_TAGS = "trace.span.tags";
@ -70,10 +66,6 @@ public class Config {
public static final String RUNTIME_CONTEXT_FIELD_INJECTION =
"trace.runtime.context.field.injection";
public static final String HEALTH_METRICS_ENABLED = "trace.health.metrics.enabled";
public static final String HEALTH_METRICS_STATSD_HOST = "trace.health.metrics.statsd.host";
public static final String HEALTH_METRICS_STATSD_PORT = "trace.health.metrics.statsd.port";
public static final String LOGS_INJECTION_ENABLED = "logs.injection";
public static final String SERVICE_TAG = "service";
@ -86,13 +78,8 @@ public class Config {
private static final boolean DEFAULT_TRACE_ENABLED = true;
public static final boolean DEFAULT_INTEGRATIONS_ENABLED = true;
public static final String DD_AGENT_WRITER_TYPE = "DDAgentWriter";
public static final String LOGGING_WRITER_TYPE = "LoggingWriter";
private static final String DEFAULT_AGENT_WRITER_TYPE = DD_AGENT_WRITER_TYPE;
public static final String DEFAULT_AGENT_HOST = "localhost";
public static final int DEFAULT_TRACE_AGENT_PORT = 8126;
public static final String DEFAULT_AGENT_UNIX_DOMAIN_SOCKET = null;
private static final String DEFAULT_WRITER_TYPE = LOGGING_WRITER_TYPE;
private static final boolean DEFAULT_RUNTIME_CONTEXT_FIELD_INJECTION = true;
@ -107,9 +94,6 @@ public class Config {
private static final boolean DEFAULT_DB_CLIENT_HOST_SPLIT_BY_INSTANCE = false;
private static final int DEFAULT_PARTIAL_FLUSH_MIN_SPANS = 1000;
public static final boolean DEFAULT_METRICS_ENABLED = false;
// No default constants for metrics statsd support -- falls back to jmx fetch values
public static final boolean DEFAULT_LOGS_INJECTION_ENABLED = false;
private static final String SPLIT_BY_SPACE_OR_COMMA_REGEX = "[,\\s]+";
@ -133,9 +117,6 @@ public class Config {
@Getter private final boolean traceEnabled;
@Getter private final boolean integrationsEnabled;
@Getter private final String writerType;
@Getter private final String agentHost;
@Getter private final int agentPort;
@Getter private final String agentUnixDomainSocket;
@Getter private final boolean traceResolverEnabled;
private final Map<String, String> globalTags;
private final Map<String, String> spanTags;
@ -150,10 +131,6 @@ public class Config {
@Getter private final Integer partialFlushMinSpans;
@Getter private final boolean runtimeContextFieldInjection;
@Getter private final boolean healthMetricsEnabled;
@Getter private final String healthMetricsStatsdHost;
@Getter private final Integer healthMetricsStatsdPort;
@Getter private final boolean logsInjectionEnabled;
@Getter private final boolean reportHostName;
@ -180,14 +157,7 @@ public class Config {
traceEnabled = getBooleanSettingFromEnvironment(TRACE_ENABLED, DEFAULT_TRACE_ENABLED);
integrationsEnabled =
getBooleanSettingFromEnvironment(INTEGRATIONS_ENABLED, DEFAULT_INTEGRATIONS_ENABLED);
writerType = getSettingFromEnvironment(WRITER_TYPE, DEFAULT_AGENT_WRITER_TYPE);
agentHost = getSettingFromEnvironment(AGENT_HOST, DEFAULT_AGENT_HOST);
agentPort =
getIntegerSettingFromEnvironment(
TRACE_AGENT_PORT,
getIntegerSettingFromEnvironment(AGENT_PORT_LEGACY, DEFAULT_TRACE_AGENT_PORT));
agentUnixDomainSocket =
getSettingFromEnvironment(AGENT_UNIX_DOMAIN_SOCKET, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET);
writerType = getSettingFromEnvironment(WRITER_TYPE, DEFAULT_WRITER_TYPE);
traceResolverEnabled =
getBooleanSettingFromEnvironment(TRACE_RESOLVER_ENABLED, DEFAULT_TRACE_RESOLVER_ENABLED);
@ -228,12 +198,6 @@ public class Config {
getBooleanSettingFromEnvironment(
RUNTIME_CONTEXT_FIELD_INJECTION, DEFAULT_RUNTIME_CONTEXT_FIELD_INJECTION);
// Writer.Builder createMonitor will use the values of the JMX fetch & agent to fill-in defaults
healthMetricsEnabled =
getBooleanSettingFromEnvironment(HEALTH_METRICS_ENABLED, DEFAULT_METRICS_ENABLED);
healthMetricsStatsdHost = getSettingFromEnvironment(HEALTH_METRICS_STATSD_HOST, null);
healthMetricsStatsdPort = getIntegerSettingFromEnvironment(HEALTH_METRICS_STATSD_PORT, null);
logsInjectionEnabled =
getBooleanSettingFromEnvironment(LOGS_INJECTION_ENABLED, DEFAULT_LOGS_INJECTION_ENABLED);
@ -262,14 +226,6 @@ public class Config {
integrationsEnabled =
getPropertyBooleanValue(properties, INTEGRATIONS_ENABLED, parent.integrationsEnabled);
writerType = properties.getProperty(WRITER_TYPE, parent.writerType);
agentHost = properties.getProperty(AGENT_HOST, parent.agentHost);
agentPort =
getPropertyIntegerValue(
properties,
TRACE_AGENT_PORT,
getPropertyIntegerValue(properties, AGENT_PORT_LEGACY, parent.agentPort));
agentUnixDomainSocket =
properties.getProperty(AGENT_UNIX_DOMAIN_SOCKET, parent.agentUnixDomainSocket);
traceResolverEnabled =
getPropertyBooleanValue(properties, TRACE_RESOLVER_ENABLED, parent.traceResolverEnabled);
@ -310,14 +266,6 @@ public class Config {
getPropertyBooleanValue(
properties, RUNTIME_CONTEXT_FIELD_INJECTION, parent.runtimeContextFieldInjection);
healthMetricsEnabled =
getPropertyBooleanValue(properties, HEALTH_METRICS_ENABLED, DEFAULT_METRICS_ENABLED);
healthMetricsStatsdHost =
properties.getProperty(HEALTH_METRICS_STATSD_HOST, parent.healthMetricsStatsdHost);
healthMetricsStatsdPort =
getPropertyIntegerValue(
properties, HEALTH_METRICS_STATSD_PORT, parent.healthMetricsStatsdPort);
logsInjectionEnabled =
getBooleanSettingFromEnvironment(LOGS_INJECTION_ENABLED, DEFAULT_LOGS_INJECTION_ENABLED);

View File

@ -1,32 +0,0 @@
package datadog.trace.api;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DDTraceApiInfo {
public static final String VERSION;
static {
String v;
try (final BufferedReader br =
new BufferedReader(
new InputStreamReader(
DDTraceApiInfo.class.getResourceAsStream("/dd-trace-api.version"), "UTF-8"))) {
final StringBuilder sb = new StringBuilder();
for (int c = br.read(); c != -1; c = br.read()) sb.append((char) c);
v = sb.toString().trim();
} catch (final Exception e) {
v = "unknown";
}
VERSION = v;
log.info("dd-trace-api - version: {}", v);
}
public static void main(final String... args) {
System.out.println(VERSION);
}
}

View File

@ -5,15 +5,9 @@ import org.junit.Rule
import org.junit.contrib.java.lang.system.EnvironmentVariables
import org.junit.contrib.java.lang.system.RestoreSystemProperties
import static datadog.trace.api.Config.AGENT_HOST
import static datadog.trace.api.Config.AGENT_PORT_LEGACY
import static datadog.trace.api.Config.AGENT_UNIX_DOMAIN_SOCKET
import static datadog.trace.api.Config.CONFIGURATION_FILE
import static datadog.trace.api.Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE
import static datadog.trace.api.Config.GLOBAL_TAGS
import static datadog.trace.api.Config.HEALTH_METRICS_ENABLED
import static datadog.trace.api.Config.HEALTH_METRICS_STATSD_HOST
import static datadog.trace.api.Config.HEALTH_METRICS_STATSD_PORT
import static datadog.trace.api.Config.HTTP_CLIENT_ERROR_STATUSES
import static datadog.trace.api.Config.HTTP_CLIENT_HOST_SPLIT_BY_DOMAIN
import static datadog.trace.api.Config.HTTP_SERVER_ERROR_STATUSES
@ -25,7 +19,6 @@ import static datadog.trace.api.Config.RUNTIME_ID_TAG
import static datadog.trace.api.Config.SERVICE_NAME
import static datadog.trace.api.Config.SERVICE_TAG
import static datadog.trace.api.Config.SPAN_TAGS
import static datadog.trace.api.Config.TRACE_AGENT_PORT
import static datadog.trace.api.Config.TRACE_ENABLED
import static datadog.trace.api.Config.TRACE_REPORT_HOSTNAME
import static datadog.trace.api.Config.TRACE_RESOLVER_ENABLED
@ -41,8 +34,6 @@ class ConfigTest extends DDSpecification {
private static final DD_TRACE_ENABLED_ENV = "DD_TRACE_ENABLED"
private static final DD_WRITER_TYPE_ENV = "DD_WRITER_TYPE"
private static final DD_SPAN_TAGS_ENV = "DD_SPAN_TAGS"
private static final DD_TRACE_AGENT_PORT_ENV = "DD_TRACE_AGENT_PORT"
private static final DD_AGENT_PORT_LEGACY_ENV = "DD_AGENT_PORT"
private static final DD_TRACE_REPORT_HOSTNAME = "DD_TRACE_REPORT_HOSTNAME"
def "verify defaults"() {
@ -52,10 +43,7 @@ class ConfigTest extends DDSpecification {
then:
config.serviceName == "unnamed-java-app"
config.traceEnabled == true
config.writerType == "DDAgentWriter"
config.agentHost == "localhost"
config.agentPort == 8126
config.agentUnixDomainSocket == null
config.writerType == "LoggingWriter"
config.traceResolverEnabled == true
config.mergedSpanTags == [:]
config.mergedJmxTags == [(RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE_TAG): config.serviceName]
@ -66,9 +54,6 @@ class ConfigTest extends DDSpecification {
config.partialFlushMinSpans == 1000
config.reportHostName == false
config.runtimeContextFieldInjection == true
config.healthMetricsEnabled == false
config.healthMetricsStatsdHost == null
config.healthMetricsStatsdPort == null
config.toString().contains("unnamed-java-app")
where:
@ -85,10 +70,6 @@ class ConfigTest extends DDSpecification {
prop.setProperty(SERVICE_NAME, "something else")
prop.setProperty(TRACE_ENABLED, "false")
prop.setProperty(WRITER_TYPE, "LoggingWriter")
prop.setProperty(AGENT_HOST, "somehost")
prop.setProperty(TRACE_AGENT_PORT, "123")
prop.setProperty(AGENT_UNIX_DOMAIN_SOCKET, "somepath")
prop.setProperty(AGENT_PORT_LEGACY, "456")
prop.setProperty(TRACE_RESOLVER_ENABLED, "false")
prop.setProperty(GLOBAL_TAGS, "b:2")
prop.setProperty(SPAN_TAGS, "c:3")
@ -100,9 +81,6 @@ class ConfigTest extends DDSpecification {
prop.setProperty(PARTIAL_FLUSH_MIN_SPANS, "15")
prop.setProperty(TRACE_REPORT_HOSTNAME, "true")
prop.setProperty(RUNTIME_CONTEXT_FIELD_INJECTION, "false")
prop.setProperty(HEALTH_METRICS_ENABLED, "true")
prop.setProperty(HEALTH_METRICS_STATSD_HOST, "metrics statsd host")
prop.setProperty(HEALTH_METRICS_STATSD_PORT, "654")
when:
Config config = Config.get(prop)
@ -111,9 +89,6 @@ class ConfigTest extends DDSpecification {
config.serviceName == "something else"
config.traceEnabled == false
config.writerType == "LoggingWriter"
config.agentHost == "somehost"
config.agentPort == 123
config.agentUnixDomainSocket == "somepath"
config.traceResolverEnabled == false
config.mergedSpanTags == [b: "2", c: "3"]
config.mergedJmxTags == [b: "2", d: "4", (RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE_TAG): config.serviceName]
@ -124,9 +99,6 @@ class ConfigTest extends DDSpecification {
config.partialFlushMinSpans == 15
config.reportHostName == true
config.runtimeContextFieldInjection == false
config.healthMetricsEnabled == true
config.healthMetricsStatsdHost == "metrics statsd host"
config.healthMetricsStatsdPort == 654
}
def "specify overrides via system properties"() {
@ -134,10 +106,6 @@ class ConfigTest extends DDSpecification {
System.setProperty(PREFIX + SERVICE_NAME, "something else")
System.setProperty(PREFIX + TRACE_ENABLED, "false")
System.setProperty(PREFIX + WRITER_TYPE, "LoggingWriter")
System.setProperty(PREFIX + AGENT_HOST, "somehost")
System.setProperty(PREFIX + TRACE_AGENT_PORT, "123")
System.setProperty(PREFIX + AGENT_UNIX_DOMAIN_SOCKET, "somepath")
System.setProperty(PREFIX + AGENT_PORT_LEGACY, "456")
System.setProperty(PREFIX + TRACE_RESOLVER_ENABLED, "false")
System.setProperty(PREFIX + GLOBAL_TAGS, "b:2")
System.setProperty(PREFIX + SPAN_TAGS, "c:3")
@ -149,9 +117,6 @@ class ConfigTest extends DDSpecification {
System.setProperty(PREFIX + PARTIAL_FLUSH_MIN_SPANS, "25")
System.setProperty(PREFIX + TRACE_REPORT_HOSTNAME, "true")
System.setProperty(PREFIX + RUNTIME_CONTEXT_FIELD_INJECTION, "false")
System.setProperty(PREFIX + HEALTH_METRICS_ENABLED, "true")
System.setProperty(PREFIX + HEALTH_METRICS_STATSD_HOST, "metrics statsd host")
System.setProperty(PREFIX + HEALTH_METRICS_STATSD_PORT, "654")
when:
Config config = new Config()
@ -160,9 +125,6 @@ class ConfigTest extends DDSpecification {
config.serviceName == "something else"
config.traceEnabled == false
config.writerType == "LoggingWriter"
config.agentHost == "somehost"
config.agentPort == 123
config.agentUnixDomainSocket == "somepath"
config.traceResolverEnabled == false
config.mergedSpanTags == [b: "2", c: "3"]
config.mergedJmxTags == [b: "2", d: "4", (RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE_TAG): config.serviceName]
@ -173,9 +135,6 @@ class ConfigTest extends DDSpecification {
config.partialFlushMinSpans == 25
config.reportHostName == true
config.runtimeContextFieldInjection == false
config.healthMetricsEnabled == true
config.healthMetricsStatsdHost == "metrics statsd host"
config.healthMetricsStatsdPort == 654
}
def "specify overrides via env vars"() {
@ -199,21 +158,16 @@ class ConfigTest extends DDSpecification {
setup:
environmentVariables.set(DD_SERVICE_NAME_ENV, "still something else")
environmentVariables.set(DD_WRITER_TYPE_ENV, "LoggingWriter")
environmentVariables.set(DD_TRACE_AGENT_PORT_ENV, "777")
System.setProperty(PREFIX + SERVICE_NAME, "what we actually want")
System.setProperty(PREFIX + WRITER_TYPE, "DDAgentWriter")
System.setProperty(PREFIX + AGENT_HOST, "somewhere")
System.setProperty(PREFIX + TRACE_AGENT_PORT, "123")
System.setProperty(PREFIX + WRITER_TYPE, "LoggingWriter")
when:
def config = new Config()
then:
config.serviceName == "what we actually want"
config.writerType == "DDAgentWriter"
config.agentHost == "somewhere"
config.agentPort == 123
config.writerType == "LoggingWriter"
}
def "default when configured incorrectly"() {
@ -221,9 +175,6 @@ class ConfigTest extends DDSpecification {
System.setProperty(PREFIX + SERVICE_NAME, " ")
System.setProperty(PREFIX + TRACE_ENABLED, " ")
System.setProperty(PREFIX + WRITER_TYPE, " ")
System.setProperty(PREFIX + AGENT_HOST, " ")
System.setProperty(PREFIX + TRACE_AGENT_PORT, " ")
System.setProperty(PREFIX + AGENT_PORT_LEGACY, "invalid")
System.setProperty(PREFIX + TRACE_RESOLVER_ENABLED, " ")
System.setProperty(PREFIX + SPAN_TAGS, "invalid")
System.setProperty(PREFIX + HTTP_SERVER_ERROR_STATUSES, "1111")
@ -238,8 +189,6 @@ class ConfigTest extends DDSpecification {
config.serviceName == " "
config.traceEnabled == true
config.writerType == " "
config.agentHost == " "
config.agentPort == 8126
config.traceResolverEnabled == true
config.mergedSpanTags == [:]
config.httpServerErrorStatuses == (500..599).toSet()
@ -248,57 +197,12 @@ class ConfigTest extends DDSpecification {
config.dbClientSplitByInstance == false
}
def "sys props and env vars overrides for trace_agent_port and agent_port_legacy as expected"() {
setup:
if (overridePortEnvVar) {
environmentVariables.set(DD_TRACE_AGENT_PORT_ENV, "777")
}
if (overrideLegacyPortEnvVar) {
environmentVariables.set(DD_AGENT_PORT_LEGACY_ENV, "888")
}
if (overridePort) {
System.setProperty(PREFIX + TRACE_AGENT_PORT, "123")
}
if (overrideLegacyPort) {
System.setProperty(PREFIX + AGENT_PORT_LEGACY, "456")
}
when:
def config = new Config()
then:
config.agentPort == expectedPort
where:
overridePort | overrideLegacyPort | overridePortEnvVar | overrideLegacyPortEnvVar | expectedPort
true | true | false | false | 123
true | false | false | false | 123
false | true | false | false | 456
false | false | false | false | 8126
true | true | true | false | 123
true | false | true | false | 123
false | true | true | false | 777 // env var gets picked up instead.
false | false | true | false | 777 // env var gets picked up instead.
true | true | false | true | 123
true | false | false | true | 123
false | true | false | true | 456
false | false | false | true | 888 // legacy env var gets picked up instead.
true | true | true | true | 123
true | false | true | true | 123
false | true | true | true | 777 // env var gets picked up instead.
false | false | true | true | 777 // env var gets picked up instead.
}
def "sys props override properties"() {
setup:
Properties properties = new Properties()
properties.setProperty(SERVICE_NAME, "something else")
properties.setProperty(TRACE_ENABLED, "false")
properties.setProperty(WRITER_TYPE, "LoggingWriter")
properties.setProperty(AGENT_HOST, "somehost")
properties.setProperty(TRACE_AGENT_PORT, "123")
properties.setProperty(AGENT_UNIX_DOMAIN_SOCKET, "somepath")
properties.setProperty(TRACE_RESOLVER_ENABLED, "false")
properties.setProperty(GLOBAL_TAGS, "b:2")
properties.setProperty(SPAN_TAGS, "c:3")
@ -316,9 +220,6 @@ class ConfigTest extends DDSpecification {
config.serviceName == "something else"
config.traceEnabled == false
config.writerType == "LoggingWriter"
config.agentHost == "somehost"
config.agentPort == 123
config.agentUnixDomainSocket == "somepath"
config.traceResolverEnabled == false
config.mergedSpanTags == [b: "2", c: "3"]
config.mergedJmxTags == [b: "2", d: "4", (RUNTIME_ID_TAG): config.getRuntimeId(), (SERVICE_TAG): config.serviceName]
@ -335,7 +236,7 @@ class ConfigTest extends DDSpecification {
then:
config.serviceName == "unnamed-java-app"
config.writerType == "DDAgentWriter"
config.writerType == "LoggingWriter"
}
def "override empty properties"() {
@ -347,7 +248,7 @@ class ConfigTest extends DDSpecification {
then:
config.serviceName == "unnamed-java-app"
config.writerType == "DDAgentWriter"
config.writerType == "LoggingWriter"
}
def "override non empty properties"() {
@ -360,7 +261,7 @@ class ConfigTest extends DDSpecification {
then:
config.serviceName == "unnamed-java-app"
config.writerType == "DDAgentWriter"
config.writerType == "LoggingWriter"
}
def "verify integration config"() {

View File

@ -11,10 +11,7 @@ minimumBranchCoverage = 0.5
minimumInstructionCoverage = 0.5
excludedClassesCoverage += [
'datadog.trace.common.writer.ListWriter',
'datadog.trace.common.writer.LoggingWriter',
// This code is copied from okHttp samples and we have integration tests to verify that it works.
'datadog.trace.common.writer.unixdomainsockets.TunnelingUnixSocket',
'datadog.trace.common.writer.unixdomainsockets.UnixDomainSocketFactory'
'datadog.trace.common.writer.LoggingWriter'
]
apply plugin: 'org.unbroken-dome.test-sets'
@ -22,7 +19,6 @@ apply plugin: 'org.unbroken-dome.test-sets'
testSets {
ot31CompatabilityTest
ot33CompatabilityTest
traceAgentTest
}
dependencies {
@ -33,9 +29,6 @@ dependencies {
compile deps.opentracing
compile group: 'io.opentracing.contrib', name: 'opentracing-tracerresolver', version: '0.1.0'
compile group: 'com.datadoghq', name: 'java-dogstatsd-client', version: '2.1.1'
compile deps.jackson
compile deps.slf4j
compile group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.11.0' // Last version to support Java7
compile group: 'com.github.jnr', name: 'jnr-unixsocket', version: '0.23'
@ -49,8 +42,6 @@ dependencies {
testCompile project(':utils:gc-utils')
testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.17.1'
traceAgentTestCompile deps.testcontainers
ot31CompatabilityTestCompile group: 'io.opentracing', name: 'opentracing-api', version: '0.31.0'
ot31CompatabilityTestCompile group: 'io.opentracing', name: 'opentracing-util', version: '0.31.0'
ot31CompatabilityTestCompile group: 'io.opentracing', name: 'opentracing-noop', version: '0.31.0'

View File

@ -1,129 +0,0 @@
package datadog.opentracing;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* Parses container information from /proc/self/cgroup. Implementation based largely on
* Qard/container-info
*/
@Getter
@Setter
@Slf4j
public class ContainerInfo {
private static final Path CGROUP_DEFAULT_PROCFILE = Paths.get("/proc/self/cgroup");
private static final String UUID_REGEX =
"[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}";
private static final String CONTAINER_REGEX = "[0-9a-f]{64}";
private static final Pattern LINE_PATTERN = Pattern.compile("(\\d+):([^:]*):(.+)$");
private static final Pattern POD_PATTERN =
Pattern.compile("(?:.+)?pod(" + UUID_REGEX + ")(?:.slice)?$");
private static final Pattern CONTAINER_PATTERN =
Pattern.compile("(?:.+)?(" + UUID_REGEX + "|" + CONTAINER_REGEX + ")(?:.scope)?$");
private static final ContainerInfo INSTANCE;
public String containerId;
public String podId;
public List<CGroupInfo> cGroups = new ArrayList<>();
static {
ContainerInfo containerInfo = new ContainerInfo();
if (ContainerInfo.isRunningInContainer()) {
try {
containerInfo = ContainerInfo.fromDefaultProcFile();
} catch (final IOException | ParseException e) {
log.error("Unable to parse proc file");
}
}
INSTANCE = containerInfo;
}
@Getter
@Setter
public static class CGroupInfo {
public int id;
public String path;
public List<String> controllers;
public String containerId;
public String podId;
}
public static ContainerInfo get() {
return INSTANCE;
}
public static boolean isRunningInContainer() {
return Files.isReadable(CGROUP_DEFAULT_PROCFILE)
&& CGROUP_DEFAULT_PROCFILE.toFile().length() > 0;
}
public static ContainerInfo fromDefaultProcFile() throws IOException, ParseException {
final String content = new String(Files.readAllBytes(CGROUP_DEFAULT_PROCFILE));
return parse(content);
}
public static ContainerInfo parse(final String cgroupsContent) throws ParseException {
final ContainerInfo containerInfo = new ContainerInfo();
final String[] lines = cgroupsContent.split("\n");
for (final String line : lines) {
final CGroupInfo cGroupInfo = parseLine(line);
containerInfo.getCGroups().add(cGroupInfo);
if (cGroupInfo.getPodId() != null) {
containerInfo.setPodId(cGroupInfo.getPodId());
}
if (cGroupInfo.getContainerId() != null) {
containerInfo.setContainerId(cGroupInfo.getContainerId());
}
}
return containerInfo;
}
static CGroupInfo parseLine(final String line) throws ParseException {
final Matcher matcher = LINE_PATTERN.matcher(line);
if (!matcher.matches()) {
throw new ParseException("Unable to match cgroup", 0);
}
final CGroupInfo cGroupInfo = new CGroupInfo();
cGroupInfo.setId(Integer.parseInt(matcher.group(1)));
cGroupInfo.setControllers(Arrays.asList(matcher.group(2).split(",")));
final String path = matcher.group(3);
final String[] pathParts = path.split("/");
cGroupInfo.setPath(path);
if (pathParts.length >= 1) {
final Matcher containerIdMatcher = CONTAINER_PATTERN.matcher(pathParts[pathParts.length - 1]);
final String containerId = containerIdMatcher.matches() ? containerIdMatcher.group(1) : null;
cGroupInfo.setContainerId(containerId);
}
if (pathParts.length >= 2) {
final Matcher podIdMatcher = POD_PATTERN.matcher(pathParts[pathParts.length - 2]);
final String podId = podIdMatcher.matches() ? podIdMatcher.group(1) : null;
cGroupInfo.setPodId(podId);
}
return cGroupInfo;
}
}

View File

@ -2,8 +2,6 @@ package datadog.opentracing;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import datadog.trace.api.DDTags;
import datadog.trace.common.util.Clock;
import io.opentracing.Span;
@ -12,7 +10,6 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.ref.WeakReference;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -74,7 +71,6 @@ public class DDSpan implements Span {
context.getTrace().registerSpan(this);
}
@JsonIgnore
public boolean isFinished() {
return durationNano.get() != 0;
}
@ -116,18 +112,15 @@ public class DDSpan implements Span {
*
* @return true if root, false otherwise
*/
@JsonIgnore
public final boolean isRootSpan() {
return BigInteger.ZERO.equals(context.getParentId());
}
@Deprecated
@JsonIgnore
public DDSpan getRootSpan() {
return getLocalRootSpan();
}
@JsonIgnore
public DDSpan getLocalRootSpan() {
return context().getTrace().getRootSpan();
}
@ -273,93 +266,61 @@ public class DDSpan implements Span {
return this;
}
// Getters and JSON serialisation instructions
/**
* Stringified tags
*
* @return stringified tags
*/
@JsonGetter
public Map<String, String> getMeta() {
final Map<String, String> meta = new HashMap<>();
for (final Map.Entry<String, Object> entry : getTags().entrySet()) {
meta.put(entry.getKey(), String.valueOf(entry.getValue()));
}
return meta;
}
// Getters
/**
* Span metrics.
*
* @return metrics for this span
*/
@JsonGetter
public Map<String, Number> getMetrics() {
return context.getMetrics();
}
@JsonGetter("start")
public long getStartTime() {
return startTimeNano > 0 ? startTimeNano : TimeUnit.MICROSECONDS.toNanos(startTimeMicro);
}
@JsonGetter("duration")
public long getDurationNano() {
return durationNano.get();
}
@JsonGetter("service")
public String getServiceName() {
return context.getServiceName();
}
@JsonGetter("trace_id")
public BigInteger getTraceId() {
return context.getTraceId();
}
@JsonGetter("span_id")
public BigInteger getSpanId() {
return context.getSpanId();
}
@JsonGetter("parent_id")
public BigInteger getParentId() {
return context.getParentId();
}
@JsonGetter("resource")
public String getResourceName() {
return context.getResourceName();
}
@JsonGetter("name")
public String getOperationName() {
return context.getOperationName();
}
@JsonIgnore
public String getSpanType() {
return context.getSpanType();
}
@JsonIgnore
public Map<String, Object> getTags() {
return context().getTags();
}
@JsonGetter
public String getType() {
return context.getSpanType();
}
@JsonIgnore
public Boolean isError() {
return context.getErrorFlag();
}
@JsonGetter
public int getError() {
return context.getErrorFlag() ? 1 : 0;
}

View File

@ -1,6 +1,5 @@
package datadog.opentracing;
import com.fasterxml.jackson.annotation.JsonIgnore;
import datadog.opentracing.decorators.AbstractDecorator;
import datadog.trace.api.DDTags;
import java.math.BigInteger;
@ -165,12 +164,10 @@ public class DDSpanContext implements io.opentracing.SpanContext {
return Collections.emptyList();
}
@JsonIgnore
public PendingTrace getTrace() {
return trace;
}
@JsonIgnore
public DDTracer getTracer() {
return tracer;
}

View File

@ -1,37 +0,0 @@
package datadog.opentracing;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DDTraceOTInfo {
public static final String JAVA_VERSION = System.getProperty("java.version", "unknown");
public static final String JAVA_VM_NAME = System.getProperty("java.vm.name", "unknown");
public static final String JAVA_VM_VENDOR = System.getProperty("java.vm.vendor", "unknown");
public static final String VERSION;
static {
String v;
try (final BufferedReader br =
new BufferedReader(
new InputStreamReader(
DDTraceOTInfo.class.getResourceAsStream("/dd-trace-ot.version"), "UTF-8"))) {
final StringBuilder sb = new StringBuilder();
for (int c = br.read(); c != -1; c = br.read()) sb.append((char) c);
v = sb.toString().trim();
} catch (final Exception e) {
v = "unknown";
}
VERSION = v;
log.info("dd-trace - version: {}", v);
}
public static void main(final String... args) {
System.out.println(VERSION);
}
}

View File

@ -1,24 +0,0 @@
package datadog.trace.common.util;
import java.util.concurrent.ThreadFactory;
/** A {@link ThreadFactory} implementation that starts all {@link Thread} as daemons. */
public final class DaemonThreadFactory implements ThreadFactory {
private final String threadName;
/**
* Constructs a new {@code DaemonThreadFactory}.
*
* @param threadName used to prefix all thread names.
*/
public DaemonThreadFactory(final String threadName) {
this.threadName = threadName;
}
@Override
public Thread newThread(final Runnable r) {
final Thread thread = new Thread(r, threadName);
thread.setDaemon(true);
return thread;
}
}

View File

@ -1,586 +0,0 @@
package datadog.trace.common.writer;
import static datadog.trace.api.Config.DEFAULT_AGENT_HOST;
import static datadog.trace.api.Config.DEFAULT_AGENT_UNIX_DOMAIN_SOCKET;
import static datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT;
import static java.util.concurrent.TimeUnit.SECONDS;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.EventTranslator;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.timgroup.statsd.NonBlockingStatsDClient;
import com.timgroup.statsd.StatsDClient;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.util.DaemonThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
/**
* This writer buffers traces and sends them to the provided DDApi instance.
*
* <p>Written traces are passed off to a disruptor so as to avoid blocking the application's thread.
* If a flood of traces arrives that exceeds the disruptor ring size, the traces exceeding the
* threshold will be counted.
*/
@Slf4j
public class DDAgentWriter implements Writer {
private static final int DISRUPTOR_BUFFER_SIZE = 8192;
private static final int FLUSH_PAYLOAD_BYTES = 5_000_000; // 5 MB
private static final int FLUSH_PAYLOAD_DELAY = 1; // 1/second
private static final EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>> TRANSLATOR =
new EventTranslatorOneArg<Event<List<DDSpan>>, List<DDSpan>>() {
@Override
public void translateTo(
final Event<List<DDSpan>> event, final long sequence, final List<DDSpan> trace) {
event.data = trace;
}
};
private static final EventTranslator<Event<List<DDSpan>>> FLUSH_TRANSLATOR =
new EventTranslator<Event<List<DDSpan>>>() {
@Override
public void translateTo(final Event<List<DDSpan>> event, final long sequence) {
event.shouldFlush = true;
}
};
private static final ThreadFactory DISRUPTOR_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-disruptor");
private static final ThreadFactory SCHEDULED_FLUSH_THREAD_FACTORY =
new DaemonThreadFactory("dd-trace-writer");
private final Runnable flushTask = new FlushTask();
private final DDApi api;
private final int flushFrequencySeconds;
private final Disruptor<Event<List<DDSpan>>> disruptor;
private final ScheduledExecutorService scheduledWriterExecutor;
private final AtomicInteger traceCount = new AtomicInteger(0);
private final AtomicReference<ScheduledFuture<?>> flushSchedule = new AtomicReference<>();
private final Phaser apiPhaser;
private volatile boolean running = false;
private final Monitor monitor;
public DDAgentWriter() {
this(
new DDApi(DEFAULT_AGENT_HOST, DEFAULT_TRACE_AGENT_PORT, DEFAULT_AGENT_UNIX_DOMAIN_SOCKET),
new NoopMonitor());
}
public DDAgentWriter(final DDApi api, final Monitor monitor) {
this(api, monitor, DISRUPTOR_BUFFER_SIZE, FLUSH_PAYLOAD_DELAY);
}
/** Old signature (pre-Monitor) used in tests */
private DDAgentWriter(final DDApi api) {
this(api, new NoopMonitor());
}
/**
* Used in the tests.
*
* @param api
* @param disruptorSize Rounded up to next power of 2
* @param flushFrequencySeconds value < 1 disables scheduled flushes
*/
private DDAgentWriter(final DDApi api, final int disruptorSize, final int flushFrequencySeconds) {
this(api, new NoopMonitor(), disruptorSize, flushFrequencySeconds);
}
private DDAgentWriter(
final DDApi api,
final Monitor monitor,
final int disruptorSize,
final int flushFrequencySeconds) {
this.api = api;
this.monitor = monitor;
disruptor =
new Disruptor<>(
new DisruptorEventFactory<List<DDSpan>>(),
Math.max(2, Integer.highestOneBit(disruptorSize - 1) << 1), // Next power of 2
DISRUPTOR_THREAD_FACTORY,
ProducerType.MULTI,
new SleepingWaitStrategy(0, TimeUnit.MILLISECONDS.toNanos(5)));
disruptor.handleEventsWith(new TraceConsumer());
this.flushFrequencySeconds = flushFrequencySeconds;
scheduledWriterExecutor = Executors.newScheduledThreadPool(1, SCHEDULED_FLUSH_THREAD_FACTORY);
apiPhaser = new Phaser(); // Ensure API calls are completed when flushing
apiPhaser.register(); // Register on behalf of the scheduled executor thread.
}
// Exposing some statistics for consumption by monitors
public final long getDisruptorCapacity() {
return disruptor.getRingBuffer().getBufferSize();
}
public final long getDisruptorUtilizedCapacity() {
return getDisruptorCapacity() - getDisruptorRemainingCapacity();
}
public final long getDisruptorRemainingCapacity() {
return disruptor.getRingBuffer().remainingCapacity();
}
@Override
public void write(final List<DDSpan> trace) {
// We can't add events after shutdown otherwise it will never complete shutting down.
if (running) {
final boolean published = disruptor.getRingBuffer().tryPublishEvent(TRANSLATOR, trace);
if (published) {
monitor.onPublish(DDAgentWriter.this, trace);
} else {
// We're discarding the trace, but we still want to count it.
traceCount.incrementAndGet();
log.debug("Trace written to overfilled buffer. Counted but dropping trace: {}", trace);
monitor.onFailedPublish(this, trace);
}
} else {
log.debug("Trace written after shutdown. Ignoring trace: {}", trace);
monitor.onFailedPublish(this, trace);
}
}
@Override
public void incrementTraceCount() {
traceCount.incrementAndGet();
}
public DDApi getApi() {
return api;
}
@Override
public void start() {
disruptor.start();
running = true;
scheduleFlush();
monitor.onStart(this);
}
@Override
public void close() {
running = false;
boolean flushSuccess = true;
// We have to shutdown scheduled executor first to make sure no flush events issued after
// disruptor has been shutdown.
// Otherwise those events will never be processed and flush call will wait forever.
scheduledWriterExecutor.shutdown();
try {
scheduledWriterExecutor.awaitTermination(flushFrequencySeconds, SECONDS);
} catch (final InterruptedException e) {
log.warn("Waiting for flush executor shutdown interrupted.", e);
flushSuccess = false;
}
flushSuccess |= flush();
disruptor.shutdown();
monitor.onShutdown(this, flushSuccess);
}
/** This method will block until the flush is complete. */
public boolean flush() {
if (running) {
log.info("Flushing any remaining traces.");
// Register with the phaser so we can block until the flush completion.
apiPhaser.register();
disruptor.publishEvent(FLUSH_TRANSLATOR);
try {
// Allow thread to be interrupted.
apiPhaser.awaitAdvanceInterruptibly(apiPhaser.arriveAndDeregister());
return true;
} catch (final InterruptedException e) {
log.warn("Waiting for flush interrupted.", e);
return false;
}
} else {
return false;
}
}
@Override
public String toString() {
// DQH - I don't particularly like the instanceof check,
// but I decided it was preferable to adding an isNoop method onto
// Monitor or checking the result of Monitor#toString() to determine
// if something is *probably* the NoopMonitor.
String str = "DDAgentWriter { api=" + api;
if (!(monitor instanceof NoopMonitor)) {
str += ", monitor=" + monitor;
}
str += " }";
return str;
}
private void scheduleFlush() {
if (flushFrequencySeconds > 0 && !scheduledWriterExecutor.isShutdown()) {
final ScheduledFuture<?> previous =
flushSchedule.getAndSet(
scheduledWriterExecutor.schedule(flushTask, flushFrequencySeconds, SECONDS));
final boolean previousIncomplete = (previous != null);
if (previousIncomplete) {
previous.cancel(true);
}
monitor.onScheduleFlush(this, previousIncomplete);
}
}
private class FlushTask implements Runnable {
@Override
public void run() {
// Don't call flush() because it would block the thread also used for sending the traces.
disruptor.publishEvent(FLUSH_TRANSLATOR);
}
}
/** This class is intentionally not threadsafe. */
private class TraceConsumer implements EventHandler<Event<List<DDSpan>>> {
private List<byte[]> serializedTraces = new ArrayList<>();
private int payloadSize = 0;
@Override
public void onEvent(
final Event<List<DDSpan>> event, final long sequence, final boolean endOfBatch) {
final List<DDSpan> trace = event.data;
event.data = null; // clear the event for reuse.
if (trace != null) {
traceCount.incrementAndGet();
try {
final byte[] serializedTrace = api.serializeTrace(trace);
payloadSize += serializedTrace.length;
serializedTraces.add(serializedTrace);
monitor.onSerialize(DDAgentWriter.this, trace, serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
} catch (final Throwable e) {
log.debug("Error while serializing trace", e);
monitor.onFailedSerialize(DDAgentWriter.this, trace, e);
}
}
if (event.shouldFlush || payloadSize >= FLUSH_PAYLOAD_BYTES) {
reportTraces();
event.shouldFlush = false;
}
}
private void reportTraces() {
try {
if (serializedTraces.isEmpty()) {
apiPhaser.arrive(); // Allow flush to return
return;
// scheduleFlush called in finally block.
}
final List<byte[]> toSend = serializedTraces;
serializedTraces = new ArrayList<>(toSend.size());
// ^ Initialize with similar size to reduce arraycopy churn.
final int representativeCount = traceCount.getAndSet(0);
final int sizeInBytes = payloadSize;
// Run the actual IO task on a different thread to avoid blocking the consumer.
scheduledWriterExecutor.execute(
new Runnable() {
@Override
public void run() {
try {
final DDApi.Response response =
api.sendSerializedTraces(representativeCount, sizeInBytes, toSend);
if (response.success()) {
log.debug("Successfully sent {} traces to the API", toSend.size());
monitor.onSend(DDAgentWriter.this, representativeCount, sizeInBytes, response);
} else {
log.debug(
"Failed to send {} traces (representing {}) of size {} bytes to the API",
toSend.size(),
representativeCount,
sizeInBytes);
monitor.onFailedSend(
DDAgentWriter.this, representativeCount, sizeInBytes, response);
}
} catch (final Throwable e) {
log.debug("Failed to send traces to the API: {}", e.getMessage());
// DQH - 10/2019 - DDApi should wrap most exceptions itself, so this really
// shouldn't occur.
// However, just to be safe to start, create a failed Response to handle any
// spurious Throwable-s.
monitor.onFailedSend(
DDAgentWriter.this,
representativeCount,
sizeInBytes,
DDApi.Response.failed(e));
} finally {
apiPhaser.arrive(); // Flush completed.
}
}
});
} finally {
payloadSize = 0;
scheduleFlush();
}
}
}
private static class Event<T> {
private volatile boolean shouldFlush = false;
private volatile T data = null;
}
private static class DisruptorEventFactory<T> implements EventFactory<Event<T>> {
@Override
public Event<T> newInstance() {
return new Event<>();
}
}
/**
* Callback interface for monitoring the health of the DDAgentWriter. Provides hooks for major
* lifecycle events...
*
* <ul>
* <li>start
* <li>shutdown
* <li>publishing to disruptor
* <li>serializing
* <li>sending to agent
* </ul>
*/
public interface Monitor {
void onStart(final DDAgentWriter agentWriter);
void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess);
void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace);
void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete);
void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace);
void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause);
void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response);
void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response);
}
public static final class NoopMonitor implements Monitor {
@Override
public void onStart(final DDAgentWriter agentWriter) {}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {}
@Override
public void onScheduleFlush(
final DDAgentWriter agentWriter, final boolean previousIncomplete) {}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {}
@Override
public String toString() {
return "NoOp";
}
}
public static final class StatsDMonitor implements Monitor {
public static final String PREFIX = "datadog.tracer";
public static final String LANG_TAG = "lang";
public static final String LANG_VERSION_TAG = "lang_version";
public static final String LANG_INTERPRETER_TAG = "lang_interpreter";
public static final String LANG_INTERPRETER_VENDOR_TAG = "lang_interpreter_vendor";
public static final String TRACER_VERSION_TAG = "tracer_version";
private final String hostInfo;
private final StatsDClient statsd;
// DQH - Made a conscious choice to not take a Config object here.
// Letting the creating of the Monitor take the Config,
// so it can decide which Monitor variant to create.
public StatsDMonitor(final String host, final int port) {
hostInfo = host + ":" + port;
statsd = new NonBlockingStatsDClient(PREFIX, host, port, getDefaultTags());
}
// Currently, intended for testing
private StatsDMonitor(final StatsDClient statsd) {
hostInfo = null;
this.statsd = statsd;
}
protected static final String[] getDefaultTags() {
return new String[] {
tag(LANG_TAG, "java"),
tag(LANG_VERSION_TAG, DDTraceOTInfo.JAVA_VERSION),
tag(LANG_INTERPRETER_TAG, DDTraceOTInfo.JAVA_VM_NAME),
tag(LANG_INTERPRETER_VENDOR_TAG, DDTraceOTInfo.JAVA_VM_VENDOR),
tag(TRACER_VERSION_TAG, DDTraceOTInfo.VERSION)
};
}
private static final String tag(final String tagPrefix, final String tagValue) {
return tagPrefix + ":" + tagValue;
}
@Override
public void onStart(final DDAgentWriter agentWriter) {
statsd.recordGaugeValue("queue.max_length", agentWriter.getDisruptorCapacity());
}
@Override
public void onShutdown(final DDAgentWriter agentWriter, final boolean flushSuccess) {}
@Override
public void onPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.accepted");
statsd.count("queue.accepted_lengths", trace.size());
}
@Override
public void onFailedPublish(final DDAgentWriter agentWriter, final List<DDSpan> trace) {
statsd.incrementCounter("queue.dropped");
}
@Override
public void onScheduleFlush(final DDAgentWriter agentWriter, final boolean previousIncomplete) {
// not recorded
}
@Override
public void onSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final byte[] serializedTrace) {
// DQH - Because of Java tracer's 2 phase acceptance and serialization scheme, this doesn't
// map precisely
statsd.count("queue.accepted_size", serializedTrace.length);
}
@Override
public void onFailedSerialize(
final DDAgentWriter agentWriter, final List<DDSpan> trace, final Throwable optionalCause) {
// TODO - DQH - make a new stat for serialization failure -- or maybe count this towards
// api.errors???
}
@Override
public void onSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
@Override
public void onFailedSend(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
onSendAttempt(agentWriter, representativeCount, sizeInBytes, response);
}
private void onSendAttempt(
final DDAgentWriter agentWriter,
final int representativeCount,
final int sizeInBytes,
final DDApi.Response response) {
statsd.incrementCounter("api.requests");
statsd.recordGaugeValue("queue.length", representativeCount);
// TODO: missing queue.spans (# of spans being sent)
statsd.recordGaugeValue("queue.size", sizeInBytes);
if (response.exception() != null) {
// covers communication errors -- both not receiving a response or
// receiving malformed response (even when otherwise successful)
statsd.incrementCounter("api.errors");
}
if (response.status() != null) {
statsd.incrementCounter("api.responses", "status: " + response.status());
}
}
public String toString() {
if (hostInfo == null) {
return "StatsD";
} else {
return "StatsD { host=" + hostInfo + " }";
}
}
}
}

View File

@ -1,373 +0,0 @@
package datadog.trace.common.writer;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.opentracing.ContainerInfo;
import datadog.opentracing.DDSpan;
import datadog.opentracing.DDTraceOTInfo;
import datadog.trace.common.writer.unixdomainsockets.UnixDomainSocketFactory;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okio.BufferedSink;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.jackson.dataformat.MessagePackFactory;
/** The API pointing to a DD agent */
@Slf4j
public class DDApi {
private static final String DATADOG_META_LANG = "Datadog-Meta-Lang";
private static final String DATADOG_META_LANG_VERSION = "Datadog-Meta-Lang-Version";
private static final String DATADOG_META_LANG_INTERPRETER = "Datadog-Meta-Lang-Interpreter";
private static final String DATADOG_META_LANG_INTERPRETER_VENDOR =
"Datadog-Meta-Lang-Interpreter-Vendor";
private static final String DATADOG_META_TRACER_VERSION = "Datadog-Meta-Tracer-Version";
private static final String DATADOG_CONTAINER_ID = "Datadog-Container-ID";
private static final String X_DATADOG_TRACE_COUNT = "X-Datadog-Trace-Count";
private static final int HTTP_TIMEOUT = 1; // 1 second for conenct/read/write operations
private static final String TRACES_ENDPOINT_V3 = "v0.3/traces";
private static final String TRACES_ENDPOINT_V4 = "v0.4/traces";
private static final long MILLISECONDS_BETWEEN_ERROR_LOG = TimeUnit.MINUTES.toMillis(5);
private final List<ResponseListener> responseListeners = new ArrayList<>();
private volatile long nextAllowedLogTime = 0;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new MessagePackFactory());
private static final MediaType MSGPACK = MediaType.get("application/msgpack");
private final OkHttpClient httpClient;
private final HttpUrl tracesUrl;
public DDApi(final String host, final int port, final String unixDomainSocketPath) {
this(
host,
port,
traceEndpointAvailable(getUrl(host, port, TRACES_ENDPOINT_V4), unixDomainSocketPath),
unixDomainSocketPath);
}
DDApi(
final String host,
final int port,
final boolean v4EndpointsAvailable,
final String unixDomainSocketPath) {
httpClient = buildHttpClient(unixDomainSocketPath);
if (v4EndpointsAvailable) {
tracesUrl = getUrl(host, port, TRACES_ENDPOINT_V4);
} else {
log.debug("API v0.4 endpoints not available. Downgrading to v0.3");
tracesUrl = getUrl(host, port, TRACES_ENDPOINT_V3);
}
}
public void addResponseListener(final ResponseListener listener) {
if (!responseListeners.contains(listener)) {
responseListeners.add(listener);
}
}
/**
* Send traces to the DD agent
*
* @param traces the traces to be sent
* @return a Response object -- encapsulating success of communication, sending, and result
* parsing
*/
public Response sendTraces(final List<List<DDSpan>> traces) {
final List<byte[]> serializedTraces = new ArrayList<>(traces.size());
int sizeInBytes = 0;
for (final List<DDSpan> trace : traces) {
try {
final byte[] serializedTrace = serializeTrace(trace);
sizeInBytes += serializedTrace.length;
serializedTraces.add(serializedTrace);
} catch (final JsonProcessingException e) {
log.warn("Error serializing trace", e);
// TODO: DQH - Incorporate the failed serialization into the Response object???
}
}
return sendSerializedTraces(serializedTraces.size(), sizeInBytes, serializedTraces);
}
byte[] serializeTrace(final List<DDSpan> trace) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsBytes(trace);
}
Response sendSerializedTraces(
final int representativeCount, final Integer sizeInBytes, final List<byte[]> traces) {
try {
final RequestBody body =
new RequestBody() {
@Override
public MediaType contentType() {
return MSGPACK;
}
@Override
public long contentLength() {
final int traceCount = traces.size();
// Need to allocate additional to handle MessagePacker.packArrayHeader
if (traceCount < (1 << 4)) {
return sizeInBytes + 1; // byte
} else if (traceCount < (1 << 16)) {
return sizeInBytes + 3; // byte + short
} else {
return sizeInBytes + 5; // byte + int
}
}
@Override
public void writeTo(final BufferedSink sink) throws IOException {
final OutputStream out = sink.outputStream();
final MessagePacker packer = MessagePack.newDefaultPacker(out);
packer.packArrayHeader(traces.size());
for (final byte[] trace : traces) {
packer.writePayload(trace);
}
packer.close();
out.close();
}
};
final Request request =
prepareRequest(tracesUrl)
.addHeader(X_DATADOG_TRACE_COUNT, String.valueOf(representativeCount))
.put(body)
.build();
try (final okhttp3.Response response = httpClient.newCall(request).execute()) {
if (response.code() != 200) {
if (log.isDebugEnabled()) {
log.debug(
"Error while sending {} of {} traces to the DD agent. Status: {}, Response: {}, Body: {}",
traces.size(),
representativeCount,
response.code(),
response.message(),
response.body().string());
} else if (nextAllowedLogTime < System.currentTimeMillis()) {
nextAllowedLogTime = System.currentTimeMillis() + MILLISECONDS_BETWEEN_ERROR_LOG;
log.warn(
"Error while sending {} of {} traces to the DD agent. Status: {} {} (going silent for {} minutes)",
traces.size(),
representativeCount,
response.code(),
response.message(),
TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG));
}
return Response.failed(response.code());
}
log.debug(
"Successfully sent {} of {} traces to the DD agent.",
traces.size(),
representativeCount);
final String responseString = response.body().string().trim();
try {
if (!"".equals(responseString) && !"OK".equalsIgnoreCase(responseString)) {
final JsonNode parsedResponse = OBJECT_MAPPER.readTree(responseString);
final String endpoint = tracesUrl.toString();
for (final ResponseListener listener : responseListeners) {
listener.onResponse(endpoint, parsedResponse);
}
return Response.success(response.code(), parsedResponse);
}
return Response.success(response.code());
} catch (final JsonParseException e) {
log.debug("Failed to parse DD agent response: " + responseString, e);
return Response.success(response.code(), e);
}
}
} catch (final IOException e) {
if (log.isDebugEnabled()) {
log.debug(
"Error while sending "
+ traces.size()
+ " of "
+ representativeCount
+ " traces to the DD agent.",
e);
} else if (nextAllowedLogTime < System.currentTimeMillis()) {
nextAllowedLogTime = System.currentTimeMillis() + MILLISECONDS_BETWEEN_ERROR_LOG;
log.warn(
"Error while sending {} of {} traces to the DD agent. {}: {} (going silent for {} minutes)",
traces.size(),
representativeCount,
e.getClass().getName(),
e.getMessage(),
TimeUnit.MILLISECONDS.toMinutes(MILLISECONDS_BETWEEN_ERROR_LOG));
}
return Response.failed(e);
}
}
private static boolean traceEndpointAvailable(
final HttpUrl url, final String unixDomainSocketPath) {
return endpointAvailable(url, unixDomainSocketPath, Collections.emptyList(), true);
}
private static boolean endpointAvailable(
final HttpUrl url,
final String unixDomainSocketPath,
final Object data,
final boolean retry) {
try {
final OkHttpClient client = buildHttpClient(unixDomainSocketPath);
final RequestBody body = RequestBody.create(MSGPACK, OBJECT_MAPPER.writeValueAsBytes(data));
final Request request = prepareRequest(url).put(body).build();
try (final okhttp3.Response response = client.newCall(request).execute()) {
return response.code() == 200;
}
} catch (final IOException e) {
if (retry) {
return endpointAvailable(url, unixDomainSocketPath, data, false);
}
}
return false;
}
private static OkHttpClient buildHttpClient(final String unixDomainSocketPath) {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
if (unixDomainSocketPath != null) {
builder = builder.socketFactory(new UnixDomainSocketFactory(new File(unixDomainSocketPath)));
}
return builder
.connectTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS)
.writeTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS)
.readTimeout(HTTP_TIMEOUT, TimeUnit.SECONDS)
.build();
}
private static HttpUrl getUrl(final String host, final int port, final String endPoint) {
return new HttpUrl.Builder()
.scheme("http")
.host(host)
.port(port)
.addEncodedPathSegments(endPoint)
.build();
}
private static Request.Builder prepareRequest(final HttpUrl url) {
final Request.Builder builder =
new Request.Builder()
.url(url)
.addHeader(DATADOG_META_LANG, "java")
.addHeader(DATADOG_META_LANG_VERSION, DDTraceOTInfo.JAVA_VERSION)
.addHeader(DATADOG_META_LANG_INTERPRETER, DDTraceOTInfo.JAVA_VM_NAME)
.addHeader(DATADOG_META_LANG_INTERPRETER_VENDOR, DDTraceOTInfo.JAVA_VM_VENDOR)
.addHeader(DATADOG_META_TRACER_VERSION, DDTraceOTInfo.VERSION);
final String containerId = ContainerInfo.get().getContainerId();
if (containerId == null) {
return builder;
} else {
return builder.addHeader(DATADOG_CONTAINER_ID, containerId);
}
}
@Override
public String toString() {
return "DDApi { tracesUrl=" + tracesUrl + " }";
}
/**
* Encapsulates an attempted response from the Datadog agent.
*
* <p>If communication fails or times out, the Response will NOT be successful and will lack
* status code, but will have an exception.
*
* <p>If an communication occurs, the Response will have a status code and will be marked as
* success or fail in accordance with the code.
*
* <p>NOTE: A successful communication may still contain an exception if there was a problem
* parsing the response from the Datadog agent.
*/
public static final class Response {
/** Factory method for a successful request with a trivial response body */
public static final Response success(final int status) {
return new Response(true, status, null, null);
}
/** Factory method for a successful request with a well-formed JSON response body */
public static final Response success(final int status, final JsonNode json) {
return new Response(true, status, json, null);
}
/** Factory method for a successful request will a malformed response body */
public static final Response success(final int status, final Throwable exception) {
return new Response(true, status, null, exception);
}
/** Factory method for a request that receive an error status in response */
public static final Response failed(final int status) {
return new Response(false, status, null, null);
}
/** Factory method for a failed communication attempt */
public static final Response failed(final Throwable exception) {
return new Response(false, null, null, exception);
}
private final boolean success;
private final Integer status;
private final JsonNode json;
private final Throwable exception;
private Response(
final boolean success,
final Integer status,
final JsonNode json,
final Throwable exception) {
this.success = success;
this.status = status;
this.json = json;
this.exception = exception;
}
public final boolean success() {
return this.success;
}
// TODO: DQH - In Java 8, switch to OptionalInteger
public final Integer status() {
return this.status;
}
public final JsonNode json() {
return this.json;
}
// TODO: DQH - In Java 8, switch to Optional<Throwable>?
public final Throwable exception() {
return this.exception;
}
}
public interface ResponseListener {
/** Invoked after the api receives a response from the core agent. */
void onResponse(String endpoint, JsonNode responseJson);
}
}

View File

@ -1,19 +1,17 @@
package datadog.trace.common.writer;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.opentracing.DDSpan;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LoggingWriter implements Writer {
private final ObjectMapper serializer = new ObjectMapper();
@Override
public void write(final List<DDSpan> trace) {
if (log.isInfoEnabled()) {
if (log.isInfoEnabled() && !trace.isEmpty()) {
try {
log.info("write(trace): {}", serializer.writeValueAsString(trace));
log.info("write(trace): {}", trace.get(0).getTraceId());
} catch (final Exception e) {
log.error("error writing(trace): {}", trace);
}

View File

@ -4,7 +4,6 @@ import datadog.opentracing.DDSpan;
import datadog.trace.api.Config;
import java.io.Closeable;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
/** A writer is responsible to send collected spans to some place */
@ -38,56 +37,23 @@ public interface Writer extends Closeable {
if (config != null) {
final String configuredType = config.getWriterType();
if (Config.DD_AGENT_WRITER_TYPE.equals(configuredType)) {
writer = createAgentWriter(config);
} else if (Config.LOGGING_WRITER_TYPE.equals(configuredType)) {
if (Config.LOGGING_WRITER_TYPE.equals(configuredType)) {
writer = new LoggingWriter();
} else {
log.warn(
"Writer type not configured correctly: Type {} not recognized. Defaulting to DDAgentWriter.",
"Writer type not configured correctly: Type {} not recognized. Defaulting to LoggingWriter.",
configuredType);
writer = createAgentWriter(config);
writer = new LoggingWriter();
}
} else {
log.warn(
"Writer type not configured correctly: No config provided! Defaulting to DDAgentWriter.");
writer = new DDAgentWriter();
"Writer type not configured correctly: No config provided! Defaulting to LoggingWriter.");
writer = new LoggingWriter();
}
return writer;
}
public static Writer forConfig(final Properties config) {
return forConfig(Config.get(config));
}
private static Writer createAgentWriter(final Config config) {
return new DDAgentWriter(createApi(config), createMonitor(config));
}
private static final DDApi createApi(final Config config) {
return new DDApi(
config.getAgentHost(), config.getAgentPort(), config.getAgentUnixDomainSocket());
}
private static final DDAgentWriter.Monitor createMonitor(final Config config) {
if (!config.isHealthMetricsEnabled()) {
return new DDAgentWriter.NoopMonitor();
} else {
String host = config.getHealthMetricsStatsdHost();
if (host == null) {
host = config.getAgentHost();
}
Integer port = config.getHealthMetricsStatsdPort();
if (port == null) {
return new DDAgentWriter.NoopMonitor();
}
return new DDAgentWriter.StatsDMonitor(host, port);
}
}
private Builder() {}
}
}

View File

@ -1,51 +0,0 @@
package datadog.trace.common.writer.unixdomainsockets;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import jnr.unixsocket.UnixSocket;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
/**
* Subtype UNIX socket for a higher-fidelity impersonation of TCP sockets. This is named "tunneling"
* because it assumes the ultimate destination has a hostname and port.
*
* <p>Copied from <a
* href="https://github.com/square/okhttp/blob/master/samples/unixdomainsockets/src/main/java/okhttp3/unixdomainsockets/UnixDomainServerSocketFactory.java">okHttp
* examples</a>.
*/
final class TunnelingUnixSocket extends UnixSocket {
private final File path;
private InetSocketAddress inetSocketAddress;
TunnelingUnixSocket(final File path, final UnixSocketChannel channel) {
super(channel);
this.path = path;
}
TunnelingUnixSocket(
final File path, final UnixSocketChannel channel, final InetSocketAddress address) {
this(path, channel);
inetSocketAddress = address;
}
@Override
public void connect(final SocketAddress endpoint) throws IOException {
inetSocketAddress = (InetSocketAddress) endpoint;
super.connect(new UnixSocketAddress(path), 0);
}
@Override
public void connect(final SocketAddress endpoint, final int timeout) throws IOException {
inetSocketAddress = (InetSocketAddress) endpoint;
super.connect(new UnixSocketAddress(path), timeout);
}
@Override
public InetAddress getInetAddress() {
return inetSocketAddress.getAddress();
}
}

View File

@ -1,58 +0,0 @@
package datadog.trace.common.writer.unixdomainsockets;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import javax.net.SocketFactory;
import jnr.unixsocket.UnixSocketChannel;
/**
* Impersonate TCP-style SocketFactory over UNIX domain sockets.
*
* <p>Copied from <a
* href="https://github.com/square/okhttp/blob/master/samples/unixdomainsockets/src/main/java/okhttp3/unixdomainsockets/UnixDomainSocketFactory.java">okHttp
* examples</a>.
*/
public final class UnixDomainSocketFactory extends SocketFactory {
private final File path;
public UnixDomainSocketFactory(final File path) {
this.path = path;
}
@Override
public Socket createSocket() throws IOException {
final UnixSocketChannel channel = UnixSocketChannel.open();
return new TunnelingUnixSocket(path, channel);
}
@Override
public Socket createSocket(final String host, final int port) throws IOException {
final Socket result = createSocket();
result.connect(new InetSocketAddress(host, port));
return result;
}
@Override
public Socket createSocket(
final String host, final int port, final InetAddress localHost, final int localPort)
throws IOException {
return createSocket(host, port);
}
@Override
public Socket createSocket(final InetAddress host, final int port) throws IOException {
final Socket result = createSocket();
result.connect(new InetSocketAddress(host, port));
return result;
}
@Override
public Socket createSocket(
final InetAddress host, final int port, final InetAddress localAddress, final int localPort)
throws IOException {
return createSocket(host, port);
}
}

View File

@ -1,147 +0,0 @@
package datadog.opentracing
import datadog.trace.util.test.DDSpecification
import spock.lang.Unroll
class ContainerInfoTest extends DDSpecification {
@Unroll
def "CGroupInfo is parsed from individual lines"() {
when:
ContainerInfo.CGroupInfo cGroupInfo = ContainerInfo.parseLine(line)
then:
cGroupInfo.getId() == id
cGroupInfo.getPath() == path
cGroupInfo.getControllers() == controllers
cGroupInfo.getContainerId() == containerId
cGroupInfo.podId == podId
// Examples from container tagging rfc and Qard/container-info
where:
id | controllers | path | containerId | podId | line
// Docker examples
13 | ["name=systemd"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "13:name=systemd:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
12 | ["pids"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "12:pids:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
11 | ["hugetlb"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "11:hugetlb:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
10 | ["net_prio"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "10:net_prio:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
9 | ["perf_event"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "9:perf_event:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
8 | ["net_cls"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "8:net_cls:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
7 | ["freezer"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "7:freezer:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
6 | ["devices"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "6:devices:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
5 | ["memory"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "5:memory:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
4 | ["blkio"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "4:blkio:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
3 | ["cpuacct"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "3:cpuacct:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
2 | ["cpu"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "2:cpu:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
1 | ["cpuset"] | "/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | "3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | "1:cpuset:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"
// Kubernates examples
11 | ["perf_event"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "11:perf_event:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
10 | ["pids"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "10:pids:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
9 | ["memory"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "9:memory:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
8 | ["cpu", "cpuacct"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "8:cpu,cpuacct:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
7 | ["blkio"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "7:blkio:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
6 | ["cpuset"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "6:cpuset:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
5 | ["devices"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "5:devices:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
4 | ["freezer"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "4:freezer:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
3 | ["net_cls", "net_prio"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "3:net_cls,net_prio:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
2 | ["hugetlb"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "2:hugetlb:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
1 | ["name=systemd"] | "/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | "1:name=systemd:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"
//ECS examples
9 | ["perf_event"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "9:perf_event:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
8 | ["memory"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "8:memory:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
7 | ["hugetlb"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "7:hugetlb:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
6 | ["freezer"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "6:freezer:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
5 | ["devices"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "5:devices:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
4 | ["cpuset"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "4:cpuset:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
3 | ["cpuacct"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "3:cpuacct:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
2 | ["cpu"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "2:cpu:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
1 | ["blkio"] | "/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | "38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | "1:blkio:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"
//Fargate Examples
11 | ["hugetlb"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
10 | ["pids"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "10:pids:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
9 | ["cpuset"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "9:cpuset:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
8 | ["net_cls", "net_prio"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "8:net_cls,net_prio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
7 | ["cpu", "cpuacct"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "7:cpu,cpuacct:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
6 | ["perf_event"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "6:perf_event:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
5 | ["freezer"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "5:freezer:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
4 | ["devices"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "4:devices:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
3 | ["blkio"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "3:blkio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
2 | ["memory"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "2:memory:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
1 | ["name=systemd"] | "/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | "432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | "1:name=systemd:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"
//Reference impl examples
1 | ["name=systemd"] | "/system.slice/docker-cde7c2bab394630a42d73dc610b9c57415dced996106665d427f6d0566594411.scope" | "cde7c2bab394630a42d73dc610b9c57415dced996106665d427f6d0566594411" | null | "1:name=systemd:/system.slice/docker-cde7c2bab394630a42d73dc610b9c57415dced996106665d427f6d0566594411.scope"
1 | ["name=systemd"] | "/docker/051e2ee0bce99116029a13df4a9e943137f19f957f38ac02d6bad96f9b700f76/not_hex" | null | null | "1:name=systemd:/docker/051e2ee0bce99116029a13df4a9e943137f19f957f38ac02d6bad96f9b700f76/not_hex"
1 | ["name=systemd"] | "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod90d81341_92de_11e7_8cf2_507b9d4141fa.slice/crio-2227daf62df6694645fee5df53c1f91271546a9560e8600a525690ae252b7f63.scope" | "2227daf62df6694645fee5df53c1f91271546a9560e8600a525690ae252b7f63" | "90d81341_92de_11e7_8cf2_507b9d4141fa" | "1:name=systemd:/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod90d81341_92de_11e7_8cf2_507b9d4141fa.slice/crio-2227daf62df6694645fee5df53c1f91271546a9560e8600a525690ae252b7f63.scope"
}
@Unroll
def "Container info parsed from file content"() {
when:
ContainerInfo containerInfo = ContainerInfo.parse(content)
then:
containerInfo.getContainerId() == containerId
containerInfo.getPodId() == podId
containerInfo.getCGroups().size() == size
where:
containerId | podId | size | content
// Docker
"3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860" | null | 13 | """13:name=systemd:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
12:pids:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
11:hugetlb:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
10:net_prio:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
9:perf_event:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
8:net_cls:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
7:freezer:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
6:devices:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
5:memory:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
4:blkio:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
3:cpuacct:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
2:cpu:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
1:cpuset:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860"""
// Kubernetes
"3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1" | "3d274242-8ee0-11e9-a8a6-1e68d864ef1a" | 11 | """11:perf_event:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
10:pids:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
9:memory:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
8:cpu,cpuacct:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
7:blkio:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
6:cpuset:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
5:devices:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
4:freezer:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
3:net_cls,net_prio:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
2:hugetlb:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
1:name=systemd:/kubepods/besteffort/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1"""
// ECS
"38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce" | null | 9 | """9:perf_event:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
8:memory:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
7:hugetlb:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
6:freezer:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
5:devices:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
4:cpuset:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
3:cpuacct:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
2:cpu:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
1:blkio:/ecs/haissam-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce"""
// Fargate
"432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da" | null | 11 | """11:hugetlb:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
10:pids:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
9:cpuset:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
8:net_cls,net_prio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
7:cpu,cpuacct:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
6:perf_event:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
5:freezer:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
4:devices:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
3:blkio:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
2:memory:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
1:name=systemd:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da"""
}
}

View File

@ -1,59 +0,0 @@
package datadog.opentracing
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.trace.common.writer.ListWriter
import datadog.trace.util.test.DDSpecification
import org.msgpack.core.MessagePack
import org.msgpack.core.buffer.ArrayBufferInput
import org.msgpack.jackson.dataformat.MessagePackFactory
import org.msgpack.value.ValueType
class DDSpanSerializationTest extends DDSpecification {
def "serialize trace/span with id #value as int"() {
setup:
def objectMapper = new ObjectMapper(new MessagePackFactory())
def writer = new ListWriter()
def tracer = new DDTracer(writer)
def context = new DDSpanContext(
value,
value,
0G,
"fakeService",
"fakeOperation",
"fakeResource",
false,
"fakeType",
Collections.emptyMap(),
new PendingTrace(tracer, 1G),
tracer)
def span = new DDSpan(0, context)
byte[] bytes = objectMapper.writeValueAsBytes(span)
def unpacker = MessagePack.newDefaultUnpacker(new ArrayBufferInput(bytes))
int size = unpacker.unpackMapHeader()
expect:
for (int i = 0; i < size; i++) {
String key = unpacker.unpackString()
switch (key) {
case "trace_id":
case "span_id":
assert unpacker.nextFormat.valueType == ValueType.INTEGER
assert unpacker.unpackBigInteger() == value
break
default:
unpacker.unpackValue()
}
}
where:
value | _
0G | _
1G | _
8223372036854775807G | _
BigInteger.valueOf(Long.MAX_VALUE).subtract(1G) | _
BigInteger.valueOf(Long.MAX_VALUE).add(1G) | _
2G.pow(64).subtract(1G) | _
}
}

View File

@ -50,7 +50,7 @@ class DDSpanTest extends DDSpecification {
when:
span.setSpanType("type")
then:
span.getType() == "type"
span.getSpanType() == "type"
}
def "resource name equals operation name if null"() {

View File

@ -3,7 +3,6 @@ package datadog.trace
import datadog.opentracing.DDTracer
import datadog.opentracing.propagation.DatadogHttpCodec
import datadog.trace.api.Config
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.util.test.DDSpecification
@ -12,8 +11,6 @@ import org.junit.contrib.java.lang.system.EnvironmentVariables
import org.junit.contrib.java.lang.system.RestoreSystemProperties
import static datadog.trace.api.Config.DEFAULT_SERVICE_NAME
import static datadog.trace.api.Config.HEALTH_METRICS_ENABLED
import static datadog.trace.api.Config.HEALTH_METRICS_STATSD_PORT
import static datadog.trace.api.Config.PREFIX
import static datadog.trace.api.Config.SPAN_TAGS
import static datadog.trace.api.Config.WRITER_TYPE
@ -41,12 +38,6 @@ class DDTracerTest extends DDSpecification {
then:
tracer.serviceName == "unnamed-java-app"
tracer.writer instanceof DDAgentWriter
((DDAgentWriter) tracer.writer).api.tracesUrl.host() == "localhost"
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == 8126
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.3/traces" ||
((DDAgentWriter) tracer.writer).api.tracesUrl.encodedPath() == "/v0.4/traces"
tracer.writer.monitor instanceof DDAgentWriter.NoopMonitor
tracer.spanContextDecorators.size() == 12
@ -54,20 +45,6 @@ class DDTracerTest extends DDSpecification {
tracer.extractor instanceof DatadogHttpCodec.Extractor
}
def "verify enabling health monitor"() {
setup:
System.setProperty(PREFIX + HEALTH_METRICS_ENABLED, "true")
System.setProperty(PREFIX + HEALTH_METRICS_STATSD_PORT, "8125")
when:
def tracer = new DDTracer(new Config())
then:
tracer.writer.monitor instanceof DDAgentWriter.StatsDMonitor
tracer.writer.monitor.hostInfo == "localhost:8125"
}
def "verify overriding writer"() {
setup:
System.setProperty(PREFIX + WRITER_TYPE, "LoggingWriter")
@ -96,46 +73,6 @@ class DDTracerTest extends DDSpecification {
"a:b,c:d,e:" | [a: "b", c: "d"]
}
def "verify overriding host"() {
when:
System.setProperty(PREFIX + key, value)
def tracer = new DDTracer(new Config())
then:
tracer.writer instanceof DDAgentWriter
((DDAgentWriter) tracer.writer).api.tracesUrl.host() == value
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == 8126
where:
key | value
"agent.host" | "somethingelse"
}
def "verify overriding port"() {
when:
System.setProperty(PREFIX + key, value)
def tracer = new DDTracer(new Config())
then:
tracer.writer instanceof DDAgentWriter
((DDAgentWriter) tracer.writer).api.tracesUrl.host() == "localhost"
((DDAgentWriter) tracer.writer).api.tracesUrl.port() == Integer.valueOf(value)
where:
key | value
"agent.port" | "777"
"trace.agent.port" | "9999"
}
def "Writer is instance of LoggingWriter when property set"() {
when:
System.setProperty(PREFIX + "writer.type", "LoggingWriter")
def tracer = new DDTracer(new Config())
then:
tracer.writer instanceof LoggingWriter
}
def "verify writer constructor"() {
setup:
def writer = new ListWriter()

View File

@ -1,575 +0,0 @@
package datadog.trace.api.writer
import com.timgroup.statsd.StatsDClient
import datadog.opentracing.DDSpan
import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.DDApi
import datadog.trace.util.test.DDSpecification
import spock.lang.Timeout
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import static datadog.opentracing.SpanFactory.newSpanOf
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
import static datadog.trace.common.writer.DDAgentWriter.DISRUPTOR_BUFFER_SIZE
@Timeout(20)
class DDAgentWriterTest extends DDSpecification {
def api = Mock(DDApi)
def "test happy path"() {
setup:
def writer = new DDAgentWriter(api, 2, -1)
writer.start()
when:
writer.write(trace)
writer.write(trace)
writer.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(2, _, { it.size() == 2 })
0 * _
cleanup:
writer.close()
where:
trace = [newSpanOf(0, "fixed-thread-name")]
}
def "test flood of traces"() {
setup:
def writer = new DDAgentWriter(api, disruptorSize, -1)
writer.start()
when:
(1..traceCount).each {
writer.write(trace)
}
writer.flush()
then:
_ * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(traceCount, _, { it.size() < traceCount })
0 * _
cleanup:
writer.close()
where:
trace = [newSpanOf(0, "fixed-thread-name")]
disruptorSize = 2
traceCount = 100 // Shouldn't trigger payload, but bigger than the disruptor size.
}
def "test flush by size"() {
setup:
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1)
def phaser = writer.apiPhaser
writer.start()
phaser.register()
when:
(1..6).each {
writer.write(trace)
}
// Wait for 2 flushes of 3 by size
phaser.awaitAdvanceInterruptibly(phaser.arrive())
phaser.awaitAdvanceInterruptibly(phaser.arriveAndDeregister())
then:
6 * api.serializeTrace(_) >> { trace -> callRealMethod() }
2 * api.sendSerializedTraces(3, _, { it.size() == 3 })
when:
(1..2).each {
writer.write(trace)
}
// Flush the remaining 2
writer.flush()
then:
2 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(2, _, { it.size() == 2 })
0 * _
cleanup:
writer.close()
where:
span = [newSpanOf(0, "fixed-thread-name")]
trace = (0..10000).collect { span }
}
def "test flush by time"() {
setup:
def writer = new DDAgentWriter(api)
def phaser = writer.apiPhaser
phaser.register()
writer.start()
writer.flush()
when:
(1..5).each {
writer.write(trace)
}
phaser.awaitAdvanceInterruptibly(phaser.arriveAndDeregister())
then:
5 * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(5, _, { it.size() == 5 })
0 * _
cleanup:
writer.close()
where:
span = [newSpanOf(0, "fixed-thread-name")]
trace = (1..10).collect { span }
}
def "test default buffer size"() {
setup:
def writer = new DDAgentWriter(api, DISRUPTOR_BUFFER_SIZE, -1)
writer.start()
when:
(0..maxedPayloadTraceCount).each {
writer.write(minimalTrace)
def start = System.nanoTime()
// (consumer processes a trace in about 20 microseconds
while (System.nanoTime() - start < TimeUnit.MICROSECONDS.toNanos(100)) {
// Busywait because we don't want to fill up the ring buffer
}
}
writer.flush()
then:
(maxedPayloadTraceCount + 1) * api.serializeTrace(_) >> { trace -> callRealMethod() }
1 * api.sendSerializedTraces(maxedPayloadTraceCount, _, { it.size() == maxedPayloadTraceCount })
cleanup:
writer.close()
where:
minimalContext = new DDSpanContext(
1G,
1G,
0G,
"",
"",
"",
false,
"",
Collections.emptyMap(),
Mock(PendingTrace),
Mock(DDTracer))
minimalSpan = new DDSpan(0, minimalContext)
minimalTrace = [minimalSpan]
traceSize = DDApi.OBJECT_MAPPER.writeValueAsBytes(minimalTrace).length
maxedPayloadTraceCount = ((int) (DDAgentWriter.FLUSH_PAYLOAD_BYTES / traceSize)) + 1
}
def "check that are no interactions after close"() {
setup:
def writer = new DDAgentWriter(api)
writer.start()
when:
writer.close()
writer.write([])
writer.flush()
then:
0 * _
writer.traceCount.get() == 0
}
def createMinimalTrace() {
def minimalContext = new DDSpanContext(
1G,
1G,
0G,
"",
"",
"",
false,
"",
Collections.emptyMap(),
Mock(PendingTrace),
Mock(DDTracer))
def minimalSpan = new DDSpan(0, minimalContext)
def minimalTrace = [minimalSpan]
return minimalTrace
}
def "monitor happy path"() {
setup:
def minimalTrace = createMinimalTrace()
// DQH -- need to set-up a dummy agent for the final send callback to work
def agent = httpServer {
handlers {
put("v0.4/traces") {
response.status(200).send()
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
def monitor = Mock(DDAgentWriter.Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
writer.start()
then:
1 * monitor.onStart(writer)
when:
writer.write(minimalTrace)
writer.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _)
1 * monitor.onSend(writer, 1, _, { response -> response.success() && response.status() == 200 })
when:
writer.close()
then:
1 * monitor.onShutdown(writer, true)
cleanup:
agent.close()
}
def "monitor agent returns error"() {
setup:
def minimalTrace = createMinimalTrace()
// DQH -- need to set-up a dummy agent for the final send callback to work
def first = true
def agent = httpServer {
handlers {
put("v0.4/traces") {
// DQH - DDApi sniffs for end point existence, so respond with 200 the first time
if (first) {
response.status(200).send()
first = false
} else {
response.status(500).send()
}
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
def monitor = Mock(DDAgentWriter.Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
writer.start()
then:
1 * monitor.onStart(writer)
when:
writer.write(minimalTrace)
writer.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _)
1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == 500 })
when:
writer.close()
then:
1 * monitor.onShutdown(writer, true)
cleanup:
agent.close()
}
def "unreachable agent test"() {
setup:
def minimalTrace = createMinimalTrace()
def api = new DDApi("localhost", 8192, null) {
DDApi.Response sendSerializedTraces(
int representativeCount,
Integer sizeInBytes,
List<byte[]> traces) {
// simulating a communication failure to a server
return DDApi.Response.failed(new IOException("comm error"))
}
}
def monitor = Mock(DDAgentWriter.Monitor)
def writer = new DDAgentWriter(api, monitor)
when:
writer.start()
then:
1 * monitor.onStart(writer)
when:
writer.write(minimalTrace)
writer.flush()
then:
1 * monitor.onPublish(writer, minimalTrace)
1 * monitor.onSerialize(writer, minimalTrace, _)
1 * monitor.onScheduleFlush(writer, _)
1 * monitor.onFailedSend(writer, 1, _, { response -> !response.success() && response.status() == null })
when:
writer.close()
then:
1 * monitor.onShutdown(writer, true)
}
def "slow response test"() {
def numPublished = 0
def numFailedPublish = 0
setup:
def minimalTrace = createMinimalTrace()
// Need to set-up a dummy agent for the final send callback to work
def first = true
def agent = httpServer {
handlers {
put("v0.4/traces") {
// DDApi sniffs for end point existence, so respond quickly the first time
// then slowly thereafter
if (!first) {
// Long enough to stall the pipeline, but not long enough to fail
Thread.sleep(2_500)
}
response.status(200).send()
first = false
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(DDAgentWriter.Monitor)
monitor.onPublish(_, _) >> {
numPublished += 1
}
monitor.onFailedPublish(_, _) >> {
numFailedPublish += 1
}
def bufferSize = 32
def writer = new DDAgentWriter(api, monitor, bufferSize, DDAgentWriter.FLUSH_PAYLOAD_DELAY)
writer.start()
when:
// write & flush a single trace -- the slow agent response will cause
// additional writes to back-up the sending queue
writer.write(minimalTrace)
writer.flush()
then:
numPublished == 1
numFailedPublish == 0
when:
// send many traces to flood the sender queue...
(1..20).each {
writer.write(minimalTrace)
}
then:
// might spill back into the Disruptor slightly, but sender queue is currently unbounded
numPublished == 1 + 20
numFailedPublish == 0
when:
// now, fill-up the disruptor buffer as well
(1..bufferSize * 2).each {
writer.write(minimalTrace)
}
then:
// Disruptor still doesn't reject because the sender queue is unbounded
(numPublished + numFailedPublish) == (1 + 20 + bufferSize * 2)
numFailedPublish >= 0
cleanup:
writer.close()
agent.close()
}
def "multi threaded"() {
def numPublished = new AtomicInteger(0)
def numFailedPublish = new AtomicInteger(0)
def numRepSent = new AtomicInteger(0)
setup:
def minimalTrace = createMinimalTrace()
// Need to set-up a dummy agent for the final send callback to work
def agent = httpServer {
handlers {
put("v0.4/traces") {
response.status(200).send()
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
// This test focuses just on failed publish, so not verifying every callback
def monitor = Stub(DDAgentWriter.Monitor)
monitor.onPublish(_, _) >> {
numPublished.incrementAndGet()
}
monitor.onFailedPublish(_, _) >> {
numFailedPublish.incrementAndGet()
}
monitor.onSend(_, _, _, _) >> { writer, repCount, sizeInBytes, response ->
numRepSent.addAndGet(repCount)
}
def writer = new DDAgentWriter(api, monitor)
writer.start()
when:
def producer = {
(1..100).each {
writer.write(minimalTrace)
}
} as Runnable
def t1 = new Thread(producer)
t1.start()
def t2 = new Thread(producer)
t2.start()
t1.join()
t2.join()
writer.flush()
then:
def totalTraces = 100 + 100
numPublished.get() == totalTraces
numRepSent.get() == totalTraces
cleanup:
writer.close()
agent.close()
}
def "statsd success"() {
def numTracesAccepted = 0
def numRequests = 0
def numResponses = 0
setup:
def minimalTrace = createMinimalTrace()
// Need to set-up a dummy agent for the final send callback to work
def agent = httpServer {
handlers {
put("v0.4/traces") {
response.status(200).send()
}
}
}
def api = new DDApi("localhost", agent.address.port, null)
def statsd = Stub(StatsDClient)
statsd.incrementCounter("queue.accepted") >> { stat ->
numTracesAccepted += 1
}
statsd.incrementCounter("api.requests") >> { stat ->
numRequests += 1
}
statsd.incrementCounter("api.responses", _) >> { stat, tags ->
numResponses += 1
}
def monitor = new DDAgentWriter.StatsDMonitor(statsd)
def writer = new DDAgentWriter(api, monitor)
writer.start()
when:
writer.write(minimalTrace)
writer.flush()
then:
numTracesAccepted == 1
numRequests == 1
numResponses == 1
cleanup:
agent.close()
writer.close()
}
def "statsd comm failure"() {
def numRequests = 0
def numResponses = 0
def numErrors = 0
setup:
def minimalTrace = createMinimalTrace()
// DQH -- need to set-up a dummy agent for the final send callback to work
def api = new DDApi("localhost", 8192, null) {
DDApi.Response sendSerializedTraces(
int representativeCount,
Integer sizeInBytes,
List<byte[]> traces) {
// simulating a communication failure to a server
return DDApi.Response.failed(new IOException("comm error"))
}
}
def statsd = Stub(StatsDClient)
statsd.incrementCounter("api.requests") >> { stat ->
numRequests += 1
}
statsd.incrementCounter("api.responses", _) >> { stat, tags ->
numResponses += 1
}
statsd.incrementCounter("api.errors", _) >> { stat ->
numErrors += 1
}
def monitor = new DDAgentWriter.StatsDMonitor(statsd)
def writer = new DDAgentWriter(api, monitor)
writer.start()
when:
writer.write(minimalTrace)
writer.flush()
then:
numRequests == 1
numResponses == 0
numErrors == 1
cleanup:
writer.close()
}
}

View File

@ -1,243 +0,0 @@
package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.JsonNode
import datadog.opentracing.SpanFactory
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.DDApi.ResponseListener
import datadog.trace.util.test.DDSpecification
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer
class DDApiTest extends DDSpecification {
static mapper = DDApi.OBJECT_MAPPER
def "sending an empty list of traces returns no errors"() {
setup:
def agent = httpServer {
handlers {
put("v0.4/traces") {
if (request.contentType != "application/msgpack") {
response.status(400).send("wrong type: $request.contentType")
} else if (request.contentLength <= 0) {
response.status(400).send("no content")
} else {
response.status(200).send()
}
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
def response = client.sendTraces([])
response.success()
response.status() == 200
cleanup:
agent.close()
}
def "non-200 response"() {
setup:
def agent = httpServer {
handlers {
put("v0.4/traces") {
response.status(404).send()
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.3/traces"
def response = client.sendTraces([])
!response.success()
response.status() == 404
cleanup:
agent.close()
}
def "content is sent as MSGPACK"() {
setup:
def agent = httpServer {
handlers {
put("v0.4/traces") {
response.send()
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${agent.address.port}/v0.4/traces"
client.sendTraces(traces).success()
agent.lastRequest.contentType == "application/msgpack"
agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java"
agent.lastRequest.headers.get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown")
agent.lastRequest.headers.get("Datadog-Meta-Tracer-Version") == "Stubbed-Test-Version"
agent.lastRequest.headers.get("X-Datadog-Trace-Count") == "${traces.size()}"
convertList(agent.lastRequest.body) == expectedRequestBody
cleanup:
agent.close()
// Populate thread info dynamically as it is different when run via gradle vs idea.
where:
traces | expectedRequestBody
[] | []
[[SpanFactory.newSpanOf(1L).setTag("service.name", "my-service")]] | [[new TreeMap<>([
"duration" : 0,
"error" : 0,
"meta" : ["thread.name": Thread.currentThread().getName(), "thread.id": "${Thread.currentThread().id}"],
"metrics" : [:],
"name" : "fakeOperation",
"parent_id": 0,
"resource" : "fakeResource",
"service" : "my-service",
"span_id" : 1,
"start" : 1000,
"trace_id" : 1,
"type" : "fakeType"
])]]
[[SpanFactory.newSpanOf(100L).setTag("resource.name", "my-resource")]] | [[new TreeMap<>([
"duration" : 0,
"error" : 0,
"meta" : ["thread.name": Thread.currentThread().getName(), "thread.id": "${Thread.currentThread().id}"],
"metrics" : [:],
"name" : "fakeOperation",
"parent_id": 0,
"resource" : "my-resource",
"service" : "fakeService",
"span_id" : 1,
"start" : 100000,
"trace_id" : 1,
"type" : "fakeType"
])]]
}
def "Api ResponseListeners see 200 responses"() {
setup:
def agentResponse = new AtomicReference<String>(null)
ResponseListener responseListener = { String endpoint, JsonNode responseJson ->
agentResponse.set(responseJson.toString())
}
def agent = httpServer {
handlers {
put("v0.4/traces") {
def status = request.contentLength > 0 ? 200 : 500
response.status(status).send('{"hello":"test"}')
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
client.addResponseListener(responseListener)
when:
client.sendTraces([[], [], []])
then:
agentResponse.get() == '{"hello":"test"}'
agent.lastRequest.headers.get("Datadog-Meta-Lang") == "java"
agent.lastRequest.headers.get("Datadog-Meta-Lang-Version") == System.getProperty("java.version", "unknown")
agent.lastRequest.headers.get("Datadog-Meta-Tracer-Version") == "Stubbed-Test-Version"
agent.lastRequest.headers.get("X-Datadog-Trace-Count") == "3" // false data shows the value provided via traceCounter.
cleanup:
agent.close()
}
def "Api Downgrades to v3 if v0.4 not available"() {
setup:
def v3Agent = httpServer {
handlers {
put("v0.3/traces") {
def status = request.contentLength > 0 ? 200 : 500
response.status(status).send()
}
}
}
def client = new DDApi("localhost", v3Agent.address.port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${v3Agent.address.port}/v0.3/traces"
client.sendTraces([]).success()
cleanup:
v3Agent.close()
}
def "Api Downgrades to v3 if timeout exceeded (#delayTrace, #badPort)"() {
// This test is unfortunately only exercising the read timeout, not the connect timeout.
setup:
def agent = httpServer {
handlers {
put("v0.3/traces") {
def status = request.contentLength > 0 ? 200 : 500
response.status(status).send()
}
put("v0.4/traces") {
Thread.sleep(delayTrace)
def status = request.contentLength > 0 ? 200 : 500
response.status(status).send()
}
}
}
def port = badPort ? 999 : agent.address.port
def client = new DDApi("localhost", port, null)
expect:
client.tracesUrl.toString() == "http://localhost:${port}/$endpointVersion/traces"
cleanup:
agent.close()
where:
endpointVersion | delayTrace | badPort
"v0.4" | 0 | false
"v0.3" | 0 | true
"v0.4" | 500 | false
"v0.3" | 30000 | false
}
def "verify content length"() {
setup:
def receivedContentLength = new AtomicLong()
def agent = httpServer {
handlers {
put("v0.4/traces") {
receivedContentLength.set(request.contentLength)
response.status(200).send()
}
}
}
def client = new DDApi("localhost", agent.address.port, null)
when:
def success = client.sendTraces(traces).success()
then:
success
receivedContentLength.get() == expectedLength
cleanup:
agent.close()
where:
expectedLength | traces
1 | []
3 | [[], []]
16 | (1..15).collect { [] }
19 | (1..16).collect { [] }
65538 | (1..((1 << 16) - 1)).collect { [] }
65541 | (1..(1 << 16)).collect { [] }
}
static List<List<TreeMap<String, Object>>> convertList(byte[] bytes) {
return mapper.readValue(bytes, new TypeReference<List<List<TreeMap<String, Object>>>>() {})
}
}

View File

@ -1,61 +0,0 @@
package datadog.trace.api.writer
import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import datadog.trace.util.test.DDSpecification
import org.msgpack.core.MessagePack
import org.msgpack.jackson.dataformat.MessagePackFactory
import spock.lang.Shared
import static java.util.Collections.singletonMap
class SerializationTest extends DDSpecification {
@Shared
def jsonMapper = new ObjectMapper()
@Shared
def mpMapper = new ObjectMapper(new MessagePackFactory())
def "test json mapper serialization"() {
setup:
def map = ["key1": "val1"]
def serializedMap = mapper.writeValueAsBytes(map)
def serializedList = "[${new String(serializedMap)}]".getBytes()
when:
def result = mapper.readValue(serializedList, new TypeReference<List<Map<String, String>>>() {})
then:
result == [map]
new String(serializedList) == '[{"key1":"val1"}]'
where:
mapper = jsonMapper
}
def "test msgpack mapper serialization"() {
setup:
def serializedMaps = input.collect {
mapper.writeValueAsBytes(it)
}
def packer = MessagePack.newDefaultBufferPacker()
packer.packArrayHeader(serializedMaps.size())
serializedMaps.each {
packer.writePayload(it)
}
def serializedList = packer.toByteArray()
when:
def result = mapper.readValue(serializedList, new TypeReference<List<Map<String, String>>>() {})
then:
result == input
where:
mapper = mpMapper
// GStrings get odd results in the serializer.
input = (1..1).collect { singletonMap("key$it".toString(), "val$it".toString()) }
}
}

View File

@ -1,170 +0,0 @@
import com.fasterxml.jackson.databind.JsonNode
import datadog.opentracing.DDSpan
import datadog.opentracing.DDSpanContext
import datadog.opentracing.DDTracer
import datadog.opentracing.PendingTrace
import datadog.trace.common.writer.DDApi
import datadog.trace.common.writer.ListWriter
import datadog.trace.util.test.DDSpecification
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy
import spock.lang.Requires
import spock.lang.Shared
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
class DDApiIntegrationTest {
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
static class DDApiIntegrationV4Test extends DDSpecification {
static final WRITER = new ListWriter()
static final TRACER = new DDTracer(WRITER)
static final CONTEXT = new DDSpanContext(
1G,
1G,
0G,
"fakeService",
"fakeOperation",
"fakeResource",
null,
Collections.emptyMap(),
false,
"fakeType",
Collections.emptyMap(),
new PendingTrace(TRACER, 1G),
TRACER)
// Looks like okHttp needs to resolve this, even for connection over socket
static final SOMEHOST = "datadoghq.com"
static final SOMEPORT = 123
/*
Note: type here has to stay undefined, otherwise tests will fail in CI in Java 7 because
'testcontainers' are built for Java 8 and Java 7 cannot load this class.
*/
@Shared
def agentContainer
@Shared
def agentContainerHost = "localhost"
@Shared
def agentContainerPort = 8126
@Shared
Process process
@Shared
File socketPath
def api
def unixDomainSocketApi
def endpoint = new AtomicReference<String>(null)
def agentResponse = new AtomicReference<String>(null)
DDApi.ResponseListener responseListener = { String receivedEndpoint, JsonNode responseJson ->
endpoint.set(receivedEndpoint)
agentResponse.set(responseJson.toString())
}
def setupSpec() {
/*
CI will provide us with rabbitmq container running along side our build.
When building locally, however, we need to take matters into our own hands
and we use 'testcontainers' for this.
*/
if ("true" != System.getenv("CI")) {
agentContainer = new GenericContainer("datadog/docker-dd-agent:latest")
.withEnv(["DD_APM_ENABLED": "true",
"DD_BIND_HOST" : "0.0.0.0",
"DD_API_KEY" : "invalid_key_but_this_is_fine",
"DD_LOGS_STDOUT": "yes"])
.withExposedPorts(datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT)
.withStartupTimeout(Duration.ofSeconds(120))
// Apparently we need to sleep for a bit so agent's response `{"service:,env:":1}` in rate_by_service.
// This is clearly a race-condition and maybe we should av oid verifying complete response
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10)))
// .withLogConsumer { output ->
// print output.utf8String
// }
agentContainer.start()
agentContainerHost = agentContainer.containerIpAddress
agentContainerPort = agentContainer.getMappedPort(datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT)
}
File tmpDir = File.createTempDir()
tmpDir.deleteOnExit()
socketPath = new File(tmpDir, "socket")
process = Runtime.getRuntime().exec("socat UNIX-LISTEN:${socketPath},reuseaddr,fork TCP-CONNECT:${agentContainerHost}:${agentContainerPort}")
}
def cleanupSpec() {
if (agentContainer) {
agentContainer.stop()
}
process.destroy()
}
def setup() {
api = new DDApi(agentContainerHost, agentContainerPort, v4(), null)
api.addResponseListener(responseListener)
unixDomainSocketApi = new DDApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString())
unixDomainSocketApi.addResponseListener(responseListener)
}
boolean v4() {
return true
}
def "Sending traces succeeds (test #test)"() {
expect:
api.sendTraces(traces)
if (v4()) {
assert endpoint.get() == "http://${agentContainerHost}:${agentContainerPort}/v0.4/traces"
assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}'
}
where:
traces | test
[] | 1
[[], []] | 2
[[new DDSpan(1, CONTEXT)]] | 3
[[new DDSpan(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()), CONTEXT)]] | 4
(1..15).collect { [] } | 5
(1..16).collect { [] } | 6
// Larger traces take more than 1 second to send to the agent and get a timeout exception:
// (1..((1 << 16) - 1)).collect { [] } | 7
// (1..(1 << 16)).collect { [] } | 8
}
def "Sending traces to unix domain socket succeeds (test #test)"() {
expect:
unixDomainSocketApi.sendTraces(traces)
if (v4()) {
assert endpoint.get() == "http://${SOMEHOST}:${SOMEPORT}/v0.4/traces"
assert agentResponse.get() == '{"rate_by_service":{"service:,env:":1}}'
}
where:
traces | test
[] | 1
[[], []] | 2
[[new DDSpan(1, CONTEXT)]] | 3
[[new DDSpan(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()), CONTEXT)]] | 4
}
}
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
static class DDApiIntegrationV3Test extends DDApiIntegrationV4Test {
boolean v4() {
return false
}
def cleanup() {
assert endpoint.get() == null
assert agentResponse.get() == null
}
}
}

View File

@ -8,8 +8,6 @@ ext {
slf4j : "1.7.29",
guava : "20.0", // Last version to support Java 7
jackson : "2.10.0", // https://nvd.nist.gov/vuln/detail/CVE-2019-16942 et al
spock : "1.3-groovy-$spockGroovyVer",
groovy : groovyVer,
logback : "1.2.3",
@ -33,10 +31,6 @@ ext {
// General
slf4j : "org.slf4j:slf4j-api:${versions.slf4j}",
guava : "com.google.guava:guava:$versions.guava",
jackson : [
dependencies.create(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: versions.jackson),
dependencies.create(group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.18'),
],
bytebuddy : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy', version: "${versions.bytebuddy}"),
bytebuddyagent : dependencies.create(group: 'net.bytebuddy', name: 'byte-buddy-agent', version: "${versions.bytebuddy}"),
autoservice : [