Merge pull request #338 from DataDog/tyler/es-meta

Add additional metadata to ES client calls.
This commit is contained in:
Tyler Benson 2018-05-30 11:58:41 +10:00 committed by GitHub
commit 5fe58dc344
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1408 additions and 55 deletions

View File

@ -11,6 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.DDAdvice; import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers; import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import io.opentracing.Scope; import io.opentracing.Scope;
@ -45,6 +46,11 @@ public class Elasticsearch2TransportClientInstrumentation extends Instrumenter.C
// If we want to be more generic, we could instrument the interface instead: // If we want to be more generic, we could instrument the interface instead:
// .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient")))) // .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient"))))
classLoaderHasClasses("org.elasticsearch.plugins.SitePlugin")) classLoaderHasClasses("org.elasticsearch.plugins.SitePlugin"))
.transform(
new HelperInjector(
"com.google.common.base.Preconditions",
"com.google.common.base.Joiner",
"datadog.trace.instrumentation.elasticsearch2.TransportActionListener"))
.transform(DDTransformers.defaultTransformers()) .transform(DDTransformers.defaultTransformers())
.transform( .transform(
DDAdvice.create() DDAdvice.create()
@ -78,7 +84,7 @@ public class Elasticsearch2TransportClientInstrumentation extends Instrumenter.C
.withTag("elasticsearch.request", actionRequest.getClass().getSimpleName()) .withTag("elasticsearch.request", actionRequest.getClass().getSimpleName())
.startActive(false); .startActive(false);
actionListener = new TransportActionListener<>(actionListener, scope.span()); actionListener = new TransportActionListener<>(actionRequest, actionListener, scope.span());
return scope; return scope;
} }

View File

@ -2,30 +2,80 @@ package datadog.trace.instrumentation.elasticsearch2;
import static io.opentracing.log.Fields.ERROR_OBJECT; import static io.opentracing.log.Fields.ERROR_OBJECT;
import com.google.common.base.Joiner;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import java.util.Collections; import java.util.Collections;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
public class TransportActionListener<T> implements ActionListener<T> { public class TransportActionListener<T extends ActionResponse> implements ActionListener<T> {
private final ActionListener<T> listener; private final ActionListener<T> listener;
private final Span span; private final Span span;
public TransportActionListener(final ActionListener<T> listener, final Span span) { public TransportActionListener(
final ActionRequest actionRequest, final ActionListener<T> listener, final Span span) {
this.listener = listener; this.listener = listener;
this.span = span; this.span = span;
onRequest(actionRequest);
}
private void onRequest(final ActionRequest request) {
if (request instanceof IndicesRequest) {
final IndicesRequest req = (IndicesRequest) request;
if (req.indices() != null) {
span.setTag("elasticsearch.request.indices", Joiner.on(",").join(req.indices()));
}
}
if (request instanceof DocumentRequest) {
final DocumentRequest req = (DocumentRequest) request;
span.setTag("elasticsearch.request.write.type", req.type());
span.setTag("elasticsearch.request.write.routing", req.routing());
}
} }
@Override @Override
public void onResponse(final T response) { public void onResponse(final T response) {
if (response instanceof ActionResponse) { if (response.remoteAddress() != null) {
final ActionResponse ar = (ActionResponse) response; Tags.PEER_HOSTNAME.set(span, response.remoteAddress().getHost());
if (ar.remoteAddress() != null) { Tags.PEER_HOST_IPV4.set(span, response.remoteAddress().getAddress());
Tags.PEER_HOSTNAME.set(span, ar.remoteAddress().getHost()); Tags.PEER_PORT.set(span, response.remoteAddress().getPort());
Tags.PEER_PORT.set(span, ar.remoteAddress().getPort()); }
if (response instanceof GetResponse) {
final GetResponse resp = (GetResponse) response;
span.setTag("elasticsearch.type", resp.getType());
span.setTag("elasticsearch.id", resp.getId());
span.setTag("elasticsearch.version", resp.getVersion());
}
if (response instanceof BroadcastResponse) {
final BroadcastResponse resp = (BroadcastResponse) response;
span.setTag("elasticsearch.shard.broadcast.total", resp.getTotalShards());
span.setTag("elasticsearch.shard.broadcast.successful", resp.getSuccessfulShards());
span.setTag("elasticsearch.shard.broadcast.failed", resp.getFailedShards());
}
if (response instanceof BulkShardResponse) {
final BulkShardResponse resp = (BulkShardResponse) response;
span.setTag("elasticsearch.shard.bulk.id", resp.getShardId().getId());
span.setTag("elasticsearch.shard.bulk.index", resp.getShardId().getIndex());
}
if (response instanceof BaseNodesResponse) {
final BaseNodesResponse resp = (BaseNodesResponse) response;
if (resp.failures().length > 0) {
span.setTag("elasticsearch.node.failures", resp.failures().length);
} }
span.setTag("elasticsearch.node.cluster.name", resp.getClusterName().value());
} }
try { try {
@ -37,6 +87,7 @@ public class TransportActionListener<T> implements ActionListener<T> {
@Override @Override
public void onFailure(final Throwable e) { public void onFailure(final Throwable e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e)); span.log(Collections.singletonMap(ERROR_OBJECT, e));
try { try {

View File

@ -4,6 +4,7 @@ import io.opentracing.tag.Tags
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.node.Node import org.elasticsearch.node.Node
import org.elasticsearch.node.NodeBuilder import org.elasticsearch.node.NodeBuilder
import spock.lang.Shared import spock.lang.Shared
@ -38,6 +39,8 @@ class Elasticsearch2NodeClientTest extends AgentTestRunner {
.build() .build()
testNode = NodeBuilder.newInstance().clusterName("test-cluster").settings(settings).build() testNode = NodeBuilder.newInstance().clusterName("test-cluster").settings(settings).build()
testNode.start() testNode.start()
testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
TEST_WRITER.waitForTraces(1)
} }
def cleanupSpec() { def cleanupSpec() {
@ -64,6 +67,7 @@ class Elasticsearch2NodeClientTest extends AgentTestRunner {
resourceName "ClusterHealthAction" resourceName "ClusterHealthAction"
operationName "elasticsearch.query" operationName "elasticsearch.query"
spanType null spanType null
parent()
tags { tags {
"$Tags.COMPONENT.key" "elasticsearch-java" "$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
@ -75,4 +79,185 @@ class Elasticsearch2NodeClientTest extends AgentTestRunner {
} }
} }
} }
def "test elasticsearch error"() {
when:
client.prepareGet(indexName, indexType, id).get()
then:
thrown IndexNotFoundException
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
errored true
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
errorTags IndexNotFoundException, "no such index"
defaultTags()
}
}
}
}
where:
indexName = "invalid-index"
indexType = "test-type"
id = "1"
}
def "test elasticsearch get"() {
setup:
def indexResult = client.admin().indices().prepareCreate(indexName).get()
expect:
indexResult.acknowledged
when:
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
def emptyResult = client.prepareGet(indexName, indexType, id).get()
then:
!emptyResult.isExists()
emptyResult.id == id
emptyResult.type == indexType
emptyResult.index == indexName
when:
def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get()
then:
createResult.id == id
createResult.type == indexType
createResult.index == indexName
when:
def result = client.prepareGet(indexName, indexType, id).get()
then:
result.isExists()
result.id == id
result.type == indexType
result.index == indexName
and:
assertTraces(TEST_WRITER, 6) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "CreateIndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "CreateIndexAction"
"elasticsearch.request" "CreateIndexRequest"
"elasticsearch.request.indices" indexName
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "ClusterHealthAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "ClusterHealthAction"
"elasticsearch.request" "ClusterHealthRequest"
defaultTags()
}
}
}
trace(2, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version"(-1)
defaultTags()
}
}
}
trace(3, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "PutMappingAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "PutMappingAction"
"elasticsearch.request" "PutMappingRequest"
"elasticsearch.request.indices" indexName
defaultTags()
}
}
}
trace(4, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "IndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "IndexAction"
"elasticsearch.request" "IndexRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.request.write.type" indexType
defaultTags()
}
}
}
trace(5, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version" 1
defaultTags()
}
}
}
}
where:
indexName = "test-index"
indexType = "test-type"
id = "1"
}
} }

View File

@ -6,8 +6,10 @@ import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.node.Node import org.elasticsearch.node.Node
import org.elasticsearch.node.NodeBuilder import org.elasticsearch.node.NodeBuilder
import org.elasticsearch.transport.RemoteTransportException
import spock.lang.Shared import spock.lang.Shared
import static datadog.trace.agent.test.ListWriterAssert.assertTraces import static datadog.trace.agent.test.ListWriterAssert.assertTraces
@ -48,6 +50,8 @@ class Elasticsearch2TransportClientTest extends AgentTestRunner {
.build() .build()
).build() ).build()
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), TCP_PORT)) client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), TCP_PORT))
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
TEST_WRITER.waitForTraces(1)
} }
def cleanupSpec() { def cleanupSpec() {
@ -74,10 +78,12 @@ class Elasticsearch2TransportClientTest extends AgentTestRunner {
resourceName "ClusterHealthAction" resourceName "ClusterHealthAction"
operationName "elasticsearch.query" operationName "elasticsearch.query"
spanType null spanType null
parent()
tags { tags {
"$Tags.COMPONENT.key" "elasticsearch-java" "$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "127.0.0.1" "$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT "$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "ClusterHealthAction" "elasticsearch.action" "ClusterHealthAction"
"elasticsearch.request" "ClusterHealthRequest" "elasticsearch.request" "ClusterHealthRequest"
@ -87,4 +93,200 @@ class Elasticsearch2TransportClientTest extends AgentTestRunner {
} }
} }
} }
def "test elasticsearch error"() {
when:
client.prepareGet(indexName, indexType, id).get()
then:
thrown IndexNotFoundException
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
errored true
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
errorTags RemoteTransportException, String
defaultTags()
}
}
}
}
where:
indexName = "invalid-index"
indexType = "test-type"
id = "1"
}
def "test elasticsearch get"() {
setup:
def indexResult = client.admin().indices().prepareCreate(indexName).get()
expect:
indexResult.acknowledged
when:
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
def emptyResult = client.prepareGet(indexName, indexType, id).get()
then:
!emptyResult.isExists()
emptyResult.id == id
emptyResult.type == indexType
emptyResult.index == indexName
when:
def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get()
then:
createResult.id == id
createResult.type == indexType
createResult.index == indexName
when:
def result = client.prepareGet(indexName, indexType, id).get()
then:
result.isExists()
result.id == id
result.type == indexType
result.index == indexName
and:
assertTraces(TEST_WRITER, 6) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "CreateIndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "CreateIndexAction"
"elasticsearch.request" "CreateIndexRequest"
"elasticsearch.request.indices" indexName
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "ClusterHealthAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "ClusterHealthAction"
"elasticsearch.request" "ClusterHealthRequest"
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
defaultTags()
}
}
}
trace(2, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version"(-1)
defaultTags()
}
}
}
trace(3, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "PutMappingAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "PutMappingAction"
"elasticsearch.request" "PutMappingRequest"
"elasticsearch.request.indices" indexName
defaultTags()
}
}
}
trace(4, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "IndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "IndexAction"
"elasticsearch.request" "IndexRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.request.write.type" indexType
defaultTags()
}
}
}
trace(5, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version" 1
defaultTags()
}
}
}
}
where:
indexName = "test-index"
indexType = "test-type"
id = "1"
}
} }

View File

@ -11,6 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.DDAdvice; import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers; import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import io.opentracing.Scope; import io.opentracing.Scope;
@ -45,6 +46,11 @@ public class Elasticsearch5TransportClientInstrumentation extends Instrumenter.C
// If we want to be more generic, we could instrument the interface instead: // If we want to be more generic, we could instrument the interface instead:
// .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient")))) // .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient"))))
classLoaderHasClasses("org.elasticsearch.percolator.TransportMultiPercolateAction")) classLoaderHasClasses("org.elasticsearch.percolator.TransportMultiPercolateAction"))
.transform(
new HelperInjector(
"com.google.common.base.Preconditions",
"com.google.common.base.Joiner",
"datadog.trace.instrumentation.elasticsearch5.TransportActionListener"))
.transform(DDTransformers.defaultTransformers()) .transform(DDTransformers.defaultTransformers())
.transform( .transform(
DDAdvice.create() DDAdvice.create()
@ -78,7 +84,7 @@ public class Elasticsearch5TransportClientInstrumentation extends Instrumenter.C
.withTag("elasticsearch.request", actionRequest.getClass().getSimpleName()) .withTag("elasticsearch.request", actionRequest.getClass().getSimpleName())
.startActive(false); .startActive(false);
actionListener = new TransportActionListener<>(actionListener, scope.span()); actionListener = new TransportActionListener<>(actionRequest, actionListener, scope.span());
return scope; return scope;
} }

View File

@ -2,20 +2,49 @@ package datadog.trace.instrumentation.elasticsearch5;
import static io.opentracing.log.Fields.ERROR_OBJECT; import static io.opentracing.log.Fields.ERROR_OBJECT;
import com.google.common.base.Joiner;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import java.util.Collections; import java.util.Collections;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
public class TransportActionListener<T extends ActionResponse> implements ActionListener<T> { public class TransportActionListener<T extends ActionResponse> implements ActionListener<T> {
private final ActionListener<T> listener; private final ActionListener<T> listener;
private final Span span; private final Span span;
public TransportActionListener(final ActionListener<T> listener, final Span span) { public TransportActionListener(
final ActionRequest actionRequest, final ActionListener<T> listener, final Span span) {
this.listener = listener; this.listener = listener;
this.span = span; this.span = span;
onRequest(actionRequest);
}
private void onRequest(final ActionRequest request) {
if (request != null) {
span.setTag("elasticsearch.request.description", request.getDescription());
}
if (request instanceof IndicesRequest) {
final IndicesRequest req = (IndicesRequest) request;
if (req.indices() != null) {
span.setTag("elasticsearch.request.indices", Joiner.on(",").join(req.indices()));
}
}
if (request instanceof DocumentRequest) {
final DocumentRequest req = (DocumentRequest) request;
span.setTag("elasticsearch.request.write.type", req.type());
span.setTag("elasticsearch.request.write.routing", req.routing());
}
} }
@Override @Override
@ -26,6 +55,46 @@ public class TransportActionListener<T extends ActionResponse> implements Action
Tags.PEER_PORT.set(span, response.remoteAddress().getPort()); Tags.PEER_PORT.set(span, response.remoteAddress().getPort());
} }
if (response instanceof GetResponse) {
final GetResponse resp = (GetResponse) response;
span.setTag("elasticsearch.type", resp.getType());
span.setTag("elasticsearch.id", resp.getId());
span.setTag("elasticsearch.version", resp.getVersion());
}
if (response instanceof BroadcastResponse) {
final BroadcastResponse resp = (BroadcastResponse) response;
span.setTag("elasticsearch.shard.broadcast.total", resp.getTotalShards());
span.setTag("elasticsearch.shard.broadcast.successful", resp.getSuccessfulShards());
span.setTag("elasticsearch.shard.broadcast.failed", resp.getFailedShards());
}
if (response instanceof ReplicationResponse) {
final ReplicationResponse resp = (ReplicationResponse) response;
span.setTag("elasticsearch.shard.replication.total", resp.getShardInfo().getTotal());
span.setTag(
"elasticsearch.shard.replication.successful", resp.getShardInfo().getSuccessful());
span.setTag("elasticsearch.shard.replication.failed", resp.getShardInfo().getFailed());
}
if (response instanceof IndexResponse) {
span.setTag("elasticsearch.response.status", ((IndexResponse) response).status().getStatus());
}
if (response instanceof BulkShardResponse) {
final BulkShardResponse resp = (BulkShardResponse) response;
span.setTag("elasticsearch.shard.bulk.id", resp.getShardId().getId());
span.setTag("elasticsearch.shard.bulk.index", resp.getShardId().getIndexName());
}
if (response instanceof BaseNodesResponse) {
final BaseNodesResponse resp = (BaseNodesResponse) response;
if (resp.hasFailures()) {
span.setTag("elasticsearch.node.failures", resp.failures().size());
}
span.setTag("elasticsearch.node.cluster.name", resp.getClusterName().value());
}
try { try {
listener.onResponse(response); listener.onResponse(response);
} finally { } finally {
@ -35,6 +104,7 @@ public class TransportActionListener<T extends ActionResponse> implements Action
@Override @Override
public void onFailure(final Exception e) { public void onFailure(final Exception e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e)); span.log(Collections.singletonMap(ERROR_OBJECT, e));
try { try {

View File

@ -5,6 +5,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.settings.Settings
import org.elasticsearch.env.Environment import org.elasticsearch.env.Environment
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.node.Node import org.elasticsearch.node.Node
import org.elasticsearch.node.internal.InternalSettingsPreparer import org.elasticsearch.node.internal.InternalSettingsPreparer
import org.elasticsearch.transport.Netty3Plugin import org.elasticsearch.transport.Netty3Plugin
@ -44,6 +45,8 @@ class Elasticsearch5NodeClientTest extends AgentTestRunner {
.build() .build()
testNode = new Node(new Environment(InternalSettingsPreparer.prepareSettings(settings)), [Netty3Plugin]) testNode = new Node(new Environment(InternalSettingsPreparer.prepareSettings(settings)), [Netty3Plugin])
testNode.start() testNode.start()
testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
TEST_WRITER.waitForTraces(1)
} }
def cleanupSpec() { def cleanupSpec() {
@ -82,4 +85,190 @@ class Elasticsearch5NodeClientTest extends AgentTestRunner {
} }
} }
} }
def "test elasticsearch error"() {
when:
client.prepareGet(indexName, indexType, id).get()
then:
thrown IndexNotFoundException
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
errored true
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
errorTags IndexNotFoundException, "no such index"
defaultTags()
}
}
}
}
where:
indexName = "invalid-index"
indexType = "test-type"
id = "1"
}
def "test elasticsearch get"() {
setup:
def indexResult = client.admin().indices().prepareCreate(indexName).get()
expect:
indexResult.acknowledged
when:
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
def emptyResult = client.prepareGet(indexName, indexType, id).get()
then:
!emptyResult.isExists()
emptyResult.id == id
emptyResult.type == indexType
emptyResult.index == indexName
when:
def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get()
then:
createResult.id == id
createResult.type == indexType
createResult.index == indexName
createResult.status().status == 201
when:
def result = client.prepareGet(indexName, indexType, id).get()
then:
result.isExists()
result.id == id
result.type == indexType
result.index == indexName
and:
assertTraces(TEST_WRITER, 6) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "CreateIndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "CreateIndexAction"
"elasticsearch.request" "CreateIndexRequest"
"elasticsearch.request.indices" indexName
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "ClusterHealthAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "ClusterHealthAction"
"elasticsearch.request" "ClusterHealthRequest"
defaultTags()
}
}
}
trace(2, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version"(-1)
defaultTags()
}
}
}
trace(3, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "PutMappingAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "PutMappingAction"
"elasticsearch.request" "PutMappingRequest"
defaultTags()
}
}
}
trace(4, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "IndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "IndexAction"
"elasticsearch.request" "IndexRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.request.write.type" indexType
"elasticsearch.response.status" 201
"elasticsearch.shard.replication.total" 2
"elasticsearch.shard.replication.successful" 1
"elasticsearch.shard.replication.failed" 0
"elasticsearch.request.description" "index {[test-index][test-type][1], source[{}]}"
defaultTags()
}
}
}
trace(5, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version" 1
defaultTags()
}
}
}
}
where:
indexName = "test-index"
indexType = "test-type"
id = "1"
}
} }

View File

@ -7,9 +7,11 @@ import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.env.Environment import org.elasticsearch.env.Environment
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.node.Node import org.elasticsearch.node.Node
import org.elasticsearch.node.internal.InternalSettingsPreparer import org.elasticsearch.node.internal.InternalSettingsPreparer
import org.elasticsearch.transport.Netty3Plugin import org.elasticsearch.transport.Netty3Plugin
import org.elasticsearch.transport.RemoteTransportException
import org.elasticsearch.transport.client.PreBuiltTransportClient import org.elasticsearch.transport.client.PreBuiltTransportClient
import spock.lang.Shared import spock.lang.Shared
@ -55,6 +57,8 @@ class Elasticsearch5TransportClientTest extends AgentTestRunner {
.build() .build()
) )
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), TCP_PORT)) client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), TCP_PORT))
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
TEST_WRITER.waitForTraces(1)
} }
def cleanupSpec() { def cleanupSpec() {
@ -96,4 +100,186 @@ class Elasticsearch5TransportClientTest extends AgentTestRunner {
} }
} }
} }
def "test elasticsearch error"() {
when:
client.prepareGet(indexName, indexType, id).get()
then:
thrown IndexNotFoundException
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
errored true
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
errorTags RemoteTransportException, String
defaultTags()
}
}
}
}
where:
indexName = "invalid-index"
indexType = "test-type"
id = "1"
}
def "test elasticsearch get"() {
setup:
def indexResult = client.admin().indices().prepareCreate(indexName).get()
expect:
indexResult.acknowledged
when:
def emptyResult = client.prepareGet(indexName, indexType, id).get()
then:
!emptyResult.isExists()
emptyResult.id == id
emptyResult.type == indexType
emptyResult.index == indexName
when:
def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get()
then:
createResult.id == id
createResult.type == indexType
createResult.index == indexName
createResult.status().status == 201
when:
def result = client.prepareGet(indexName, indexType, id).get()
then:
result.isExists()
result.id == id
result.type == indexType
result.index == indexName
and:
assertTraces(TEST_WRITER, 5) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "CreateIndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "CreateIndexAction"
"elasticsearch.request" "CreateIndexRequest"
"elasticsearch.request.indices" indexName
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version"(-1)
defaultTags()
}
}
}
trace(2, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "PutMappingAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "PutMappingAction"
"elasticsearch.request" "PutMappingRequest"
defaultTags()
}
}
}
trace(3, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "IndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "IndexAction"
"elasticsearch.request" "IndexRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.request.write.type" indexType
"elasticsearch.response.status" 201
"elasticsearch.shard.replication.total" 2
"elasticsearch.shard.replication.successful" 1
"elasticsearch.shard.replication.failed" 0
"elasticsearch.request.description" "index {[test-index][test-type][1], source[{}]}"
defaultTags()
}
}
}
trace(4, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "127.0.0.1"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version" 1
defaultTags()
}
}
}
}
where:
indexName = "test-index"
indexType = "test-type"
id = "1"
}
} }

View File

@ -11,6 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.DDAdvice; import datadog.trace.agent.tooling.DDAdvice;
import datadog.trace.agent.tooling.DDTransformers; import datadog.trace.agent.tooling.DDTransformers;
import datadog.trace.agent.tooling.HelperInjector;
import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDTags; import datadog.trace.api.DDTags;
import io.opentracing.Scope; import io.opentracing.Scope;
@ -49,6 +50,11 @@ public class Elasticsearch6TransportClientInstrumentation extends Instrumenter.C
// If we want to be more generic, we could instrument the interface instead: // If we want to be more generic, we could instrument the interface instead:
// .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient")))) // .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient"))))
classLoaderHasClasses("org.elasticsearch.client.RestClientBuilder$2")) classLoaderHasClasses("org.elasticsearch.client.RestClientBuilder$2"))
.transform(
new HelperInjector(
"com.google.common.base.Preconditions",
"com.google.common.base.Joiner",
"datadog.trace.instrumentation.elasticsearch6.TransportActionListener"))
.transform(DDTransformers.defaultTransformers()) .transform(DDTransformers.defaultTransformers())
.transform( .transform(
DDAdvice.create() DDAdvice.create()
@ -82,7 +88,7 @@ public class Elasticsearch6TransportClientInstrumentation extends Instrumenter.C
.withTag("elasticsearch.request", actionRequest.getClass().getSimpleName()) .withTag("elasticsearch.request", actionRequest.getClass().getSimpleName())
.startActive(false); .startActive(false);
actionListener = new TransportActionListener<>(actionListener, scope.span()); actionListener = new TransportActionListener<>(actionRequest, actionListener, scope.span());
return scope; return scope;
} }

View File

@ -2,11 +2,21 @@ package datadog.trace.instrumentation.elasticsearch6;
import static io.opentracing.log.Fields.ERROR_OBJECT; import static io.opentracing.log.Fields.ERROR_OBJECT;
import com.google.common.base.Joiner;
import io.opentracing.Span; import io.opentracing.Span;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import java.util.Collections; import java.util.Collections;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.bulk.BulkShardResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
/** /**
* Most of this class is identical to version 5's instrumentation, but they changed an interface to * Most of this class is identical to version 5's instrumentation, but they changed an interface to
@ -17,9 +27,29 @@ public class TransportActionListener<T extends ActionResponse> implements Action
private final ActionListener<T> listener; private final ActionListener<T> listener;
private final Span span; private final Span span;
public TransportActionListener(final ActionListener<T> listener, final Span span) { public TransportActionListener(
final ActionRequest actionRequest, final ActionListener<T> listener, final Span span) {
this.listener = listener; this.listener = listener;
this.span = span; this.span = span;
onRequest(actionRequest);
}
private void onRequest(final ActionRequest request) {
if (request != null) {
span.setTag("elasticsearch.request.description", request.getDescription());
}
if (request instanceof IndicesRequest) {
final IndicesRequest req = (IndicesRequest) request;
if (req.indices() != null) {
span.setTag("elasticsearch.request.indices", Joiner.on(",").join(req.indices()));
}
}
if (request instanceof DocWriteRequest) {
final DocWriteRequest req = (DocWriteRequest) request;
span.setTag("elasticsearch.request.write.type", req.type());
span.setTag("elasticsearch.request.write.routing", req.routing());
span.setTag("elasticsearch.request.write.version", req.version());
}
} }
@Override @Override
@ -30,6 +60,46 @@ public class TransportActionListener<T extends ActionResponse> implements Action
Tags.PEER_PORT.set(span, response.remoteAddress().getPort()); Tags.PEER_PORT.set(span, response.remoteAddress().getPort());
} }
if (response instanceof GetResponse) {
final GetResponse resp = (GetResponse) response;
span.setTag("elasticsearch.type", resp.getType());
span.setTag("elasticsearch.id", resp.getId());
span.setTag("elasticsearch.version", resp.getVersion());
}
if (response instanceof BroadcastResponse) {
final BroadcastResponse resp = (BroadcastResponse) response;
span.setTag("elasticsearch.shard.broadcast.total", resp.getTotalShards());
span.setTag("elasticsearch.shard.broadcast.successful", resp.getSuccessfulShards());
span.setTag("elasticsearch.shard.broadcast.failed", resp.getFailedShards());
}
if (response instanceof ReplicationResponse) {
final ReplicationResponse resp = (ReplicationResponse) response;
span.setTag("elasticsearch.shard.replication.total", resp.getShardInfo().getTotal());
span.setTag(
"elasticsearch.shard.replication.successful", resp.getShardInfo().getSuccessful());
span.setTag("elasticsearch.shard.replication.failed", resp.getShardInfo().getFailed());
}
if (response instanceof IndexResponse) {
span.setTag("elasticsearch.response.status", ((IndexResponse) response).status().getStatus());
}
if (response instanceof BulkShardResponse) {
final BulkShardResponse resp = (BulkShardResponse) response;
span.setTag("elasticsearch.shard.bulk.id", resp.getShardId().getId());
span.setTag("elasticsearch.shard.bulk.index", resp.getShardId().getIndexName());
}
if (response instanceof BaseNodesResponse) {
final BaseNodesResponse resp = (BaseNodesResponse) response;
if (resp.hasFailures()) {
span.setTag("elasticsearch.node.failures", resp.failures().size());
}
span.setTag("elasticsearch.node.cluster.name", resp.getClusterName().value());
}
try { try {
listener.onResponse(response); listener.onResponse(response);
} finally { } finally {
@ -39,6 +109,7 @@ public class TransportActionListener<T extends ActionResponse> implements Action
@Override @Override
public void onFailure(final Exception e) { public void onFailure(final Exception e) {
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, e)); span.log(Collections.singletonMap(ERROR_OBJECT, e));
try { try {

View File

@ -4,6 +4,7 @@ import io.opentracing.tag.Tags
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.node.InternalSettingsPreparer import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty4Plugin import org.elasticsearch.transport.Netty4Plugin
@ -41,6 +42,8 @@ class Elasticsearch6NodeClientTest extends AgentTestRunner {
.build() .build()
testNode = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin]) testNode = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin])
testNode.start() testNode.start()
testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
TEST_WRITER.waitForTraces(1)
} }
def cleanupSpec() { def cleanupSpec() {
@ -53,9 +56,9 @@ class Elasticsearch6NodeClientTest extends AgentTestRunner {
def "test elasticsearch status"() { def "test elasticsearch status"() {
setup: setup:
def result = client.admin().cluster().health(new ClusterHealthRequest()) def result = client.admin().cluster().health(new ClusterHealthRequest()).get()
def status = result.get().status def status = result.status
expect: expect:
status.name() == "GREEN" status.name() == "GREEN"
@ -78,4 +81,175 @@ class Elasticsearch6NodeClientTest extends AgentTestRunner {
} }
} }
} }
def "test elasticsearch error"() {
when:
client.prepareGet(indexName, indexType, id).get()
then:
thrown IndexNotFoundException
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
errored true
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
errorTags IndexNotFoundException, "no such index"
defaultTags()
}
}
}
}
where:
indexName = "invalid-index"
indexType = "test-type"
id = "1"
}
def "test elasticsearch get"() {
setup:
def indexResult = client.admin().indices().prepareCreate(indexName).get()
expect:
indexResult.index() == indexName
when:
def emptyResult = client.prepareGet(indexName, indexType, id).get()
then:
!emptyResult.isExists()
emptyResult.id == id
emptyResult.type == indexType
emptyResult.index == indexName
when:
def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get()
then:
createResult.id == id
createResult.type == indexType
createResult.index == indexName
createResult.status().status == 201
when:
def result = client.prepareGet(indexName, indexType, id).get()
then:
result.isExists()
result.id == id
result.type == indexType
result.index == indexName
and:
assertTraces(TEST_WRITER, 5) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "CreateIndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "CreateIndexAction"
"elasticsearch.request" "CreateIndexRequest"
"elasticsearch.request.indices" indexName
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version"(-1)
defaultTags()
}
}
}
trace(2, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "PutMappingAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "PutMappingAction"
"elasticsearch.request" "PutMappingRequest"
defaultTags()
}
}
}
trace(3, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "IndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "IndexAction"
"elasticsearch.request" "IndexRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.request.write.type" indexType
"elasticsearch.request.write.version"(-3)
"elasticsearch.response.status" 201
"elasticsearch.shard.replication.total" 2
"elasticsearch.shard.replication.successful" 1
"elasticsearch.shard.replication.failed" 0
"elasticsearch.request.description" "index {[test-index][test-type][1], source[{}]}"
defaultTags()
}
}
}
trace(4, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version" 1
defaultTags()
}
}
}
}
where:
indexName = "test-index"
indexType = "test-type"
id = "1"
}
} }

View File

@ -6,9 +6,11 @@ import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.TransportAddress import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.node.InternalSettingsPreparer import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty4Plugin import org.elasticsearch.transport.Netty4Plugin
import org.elasticsearch.transport.RemoteTransportException
import org.elasticsearch.transport.client.PreBuiltTransportClient import org.elasticsearch.transport.client.PreBuiltTransportClient
import spock.lang.Shared import spock.lang.Shared
@ -52,6 +54,8 @@ class Elasticsearch6TransportClientTest extends AgentTestRunner {
.build() .build()
) )
client.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), TCP_PORT)) client.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), TCP_PORT))
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000)
TEST_WRITER.waitForTraces(1)
} }
def cleanupSpec() { def cleanupSpec() {
@ -92,4 +96,187 @@ class Elasticsearch6TransportClientTest extends AgentTestRunner {
} }
} }
} }
def "test elasticsearch error"() {
when:
client.prepareGet(indexName, indexType, id).get()
then:
thrown IndexNotFoundException
and:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
errored true
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
errorTags RemoteTransportException, String
defaultTags()
}
}
}
}
where:
indexName = "invalid-index"
indexType = "test-type"
id = "1"
}
def "test elasticsearch get"() {
setup:
def indexResult = client.admin().indices().prepareCreate(indexName).get()
expect:
indexResult.index() == indexName
when:
def emptyResult = client.prepareGet(indexName, indexType, id).get()
then:
!emptyResult.isExists()
emptyResult.id == id
emptyResult.type == indexType
emptyResult.index == indexName
when:
def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get()
then:
createResult.id == id
createResult.type == indexType
createResult.index == indexName
createResult.status().status == 201
when:
def result = client.prepareGet(indexName, indexType, id).get()
then:
result.isExists()
result.id == id
result.type == indexType
result.index == indexName
and:
assertTraces(TEST_WRITER, 5) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "CreateIndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "CreateIndexAction"
"elasticsearch.request" "CreateIndexRequest"
"elasticsearch.request.indices" indexName
defaultTags()
}
}
}
trace(1, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version"(-1)
defaultTags()
}
}
}
trace(2, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "PutMappingAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"elasticsearch.action" "PutMappingAction"
"elasticsearch.request" "PutMappingRequest"
defaultTags()
}
}
}
trace(3, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "IndexAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "IndexAction"
"elasticsearch.request" "IndexRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.request.write.type" indexType
"elasticsearch.request.write.version"(-3)
"elasticsearch.response.status" 201
"elasticsearch.shard.replication.total" 2
"elasticsearch.shard.replication.successful" 1
"elasticsearch.shard.replication.failed" 0
"elasticsearch.request.description" "index {[test-index][test-type][1], source[{}]}"
defaultTags()
}
}
}
trace(4, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GetAction"
operationName "elasticsearch.query"
spanType null
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_HOST_IPV4.key" "127.0.0.1"
"$Tags.PEER_PORT.key" TCP_PORT
"elasticsearch.action" "GetAction"
"elasticsearch.request" "GetRequest"
"elasticsearch.request.indices" indexName
"elasticsearch.type" indexType
"elasticsearch.id" "1"
"elasticsearch.version" 1
defaultTags()
}
}
}
}
where:
indexName = "test-index"
indexType = "test-type"
id = "1"
}
} }

View File

@ -1,6 +1,10 @@
package datadog.trace.agent.test package datadog.trace.agent.test
import datadog.trace.common.writer.ListWriter import datadog.trace.common.writer.ListWriter
import org.codehaus.groovy.runtime.powerassert.PowerAssertionError
import org.spockframework.runtime.Condition
import org.spockframework.runtime.ConditionNotSatisfiedError
import org.spockframework.runtime.model.TextPosition
import static datadog.trace.agent.test.TraceAssert.assertTrace import static datadog.trace.agent.test.TraceAssert.assertTrace
@ -14,21 +18,39 @@ class ListWriterAssert {
size = writer.size() size = writer.size()
} }
static ListWriterAssert assertTraces(ListWriter writer, int expectedSize, static void assertTraces(ListWriter writer, int expectedSize,
@DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { @DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
writer.waitForTraces(expectedSize) try {
assert writer.size() == expectedSize writer.waitForTraces(expectedSize)
def asserter = new ListWriterAssert(writer) assert writer.size() == expectedSize
def clone = (Closure) spec.clone() def asserter = new ListWriterAssert(writer)
clone.delegate = asserter def clone = (Closure) spec.clone()
clone.resolveStrategy = Closure.DELEGATE_FIRST clone.delegate = asserter
clone(asserter) clone.resolveStrategy = Closure.DELEGATE_FIRST
asserter.assertTracesAllVerified() clone(asserter)
asserter asserter.assertTracesAllVerified()
} catch (PowerAssertionError e) {
def stackLine = null
for (int i = 0; i < e.stackTrace.length; i++) {
def className = e.stackTrace[i].className
def skip = className.startsWith("org.codehaus.groovy.") ||
className.startsWith("datadog.trace.agent.test.") ||
className.startsWith("sun.reflect.") ||
className.startsWith("groovy.lang.") ||
className.startsWith("java.lang.")
if (skip) {
continue
}
stackLine = e.stackTrace[i]
break
}
def condition = new Condition(null, "$stackLine", TextPosition.create(stackLine == null ? 0 : stackLine.lineNumber, 0), e.message, null, e)
throw new ConditionNotSatisfiedError(condition, e)
}
} }
TraceAssert trace(int index, int expectedSize, void trace(int index, int expectedSize,
@DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { @DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
if (index >= size) { if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index) throw new ArrayIndexOutOfBoundsException(index)
} }

View File

@ -11,8 +11,8 @@ class SpanAssert {
this.span = span this.span = span
} }
static SpanAssert assertSpan(DDSpan span, static void assertSpan(DDSpan span,
@DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
def asserter = new SpanAssert(span) def asserter = new SpanAssert(span)
def clone = (Closure) spec.clone() def clone = (Closure) spec.clone()
clone.delegate = asserter clone.delegate = asserter
@ -51,7 +51,7 @@ class SpanAssert {
assert span.isError() == errored assert span.isError() == errored
} }
def tags(@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { void tags(@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
return assertTags(span, spec) assertTags(span, spec)
} }
} }

View File

@ -10,8 +10,8 @@ class TagsAssert {
this.tags = new TreeMap(span.tags) this.tags = new TreeMap(span.tags)
} }
static TagsAssert assertTags(DDSpan span, static void assertTags(DDSpan span,
@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { @DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
def asserter = new TagsAssert(span) def asserter = new TagsAssert(span)
def clone = (Closure) spec.clone() def clone = (Closure) spec.clone()
clone.delegate = asserter clone.delegate = asserter
@ -25,35 +25,35 @@ class TagsAssert {
assertedTags.add("thread.name") assertedTags.add("thread.name")
assertedTags.add("thread.id") assertedTags.add("thread.id")
tags["thread.name"] != null assert tags["thread.name"] != null
tags["thread.id"] != null assert tags["thread.id"] != null
} }
def errorTags(Class<Throwable> errorType) { def errorTags(Class<Throwable> errorType) {
errorTags(errorType, null) errorTags(errorType, null)
} }
def errorTags(Class<Throwable> errorType, String message) { def errorTags(Class<Throwable> errorType, Object message) {
assertedTags.add("error") methodMissing("error", [true].toArray())
assertedTags.add("error.type") methodMissing("error.type", [errorType.name].toArray())
assertedTags.add("error.stack") methodMissing("error.stack", [String].toArray())
if (message != null) { if (message != null) {
assertedTags.add("error.msg") methodMissing("error.msg", [message].toArray())
tags["error.msg"] == message
} }
tags["error"] == true
tags["error.type"] == errorType
tags["error.stack"] instanceof String
} }
def methodMissing(String name, args) { def methodMissing(String name, args) {
if (args.length > 1) { if (args.length > 1) {
throw new IllegalArgumentException(args) throw new IllegalArgumentException(args.toString())
} }
assertedTags.add(name) assertedTags.add(name)
assert tags[name] == args[0] def arg = args[0]
if (arg instanceof Class) {
assert ((Class) arg).isInstance(tags[name])
} else {
assert tags[name] == arg
}
} }
void assertTracesAllVerified() { void assertTracesAllVerified() {

View File

@ -14,8 +14,8 @@ class TraceAssert {
size = trace.size() size = trace.size()
} }
static TraceAssert assertTrace(List<DDSpan> trace, int expectedSize, static void assertTrace(List<DDSpan> trace, int expectedSize,
@DelegatesTo(value = File, strategy = Closure.DELEGATE_FIRST) Closure spec) { @DelegatesTo(value = File, strategy = Closure.DELEGATE_FIRST) Closure spec) {
assert trace.size() == expectedSize assert trace.size() == expectedSize
def asserter = new TraceAssert(trace) def asserter = new TraceAssert(trace)
def clone = (Closure) spec.clone() def clone = (Closure) spec.clone()
@ -30,7 +30,7 @@ class TraceAssert {
trace.get(index) trace.get(index)
} }
SpanAssert span(int index, @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { void span(int index, @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
if (index >= size) { if (index >= size) {
throw new ArrayIndexOutOfBoundsException(index) throw new ArrayIndexOutOfBoundsException(index)
} }

View File

@ -232,7 +232,7 @@ public class DDSpanContext implements io.opentracing.SpanContext {
* @param value the value of the tag. tags with null values are ignored. * @param value the value of the tag. tags with null values are ignored.
*/ */
public synchronized void setTag(final String tag, final Object value) { public synchronized void setTag(final String tag, final Object value) {
if (value == null) { if (value == null || (value instanceof String && ((String) value).isEmpty())) {
tags.remove(tag); tags.remove(tag);
return; return;
} }

View File

@ -126,8 +126,7 @@ public class DDTracer implements io.opentracing.Tracer {
} }
}); });
} catch (final IllegalStateException ex) { } catch (final IllegalStateException ex) {
// The JVM might be shutting down. // The JVM is already shutting down.
log.debug("Error adding shutdown hook.", ex);
} }
registry = new CodecRegistry(); registry = new CodecRegistry();

View File

@ -206,8 +206,7 @@ public class PendingTrace extends ConcurrentLinkedDeque<DDSpan> {
} }
}); });
} catch (final IllegalStateException ex) { } catch (final IllegalStateException ex) {
// The JVM might be shutting down. // The JVM is already shutting down.
log.debug("Error adding shutdown hook.", ex);
} }
} }