diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-2/src/main/java/datadog/trace/instrumentation/elasticsearch2/Elasticsearch2TransportClientInstrumentation.java b/dd-java-agent/instrumentation/elasticsearch-transport-2/src/main/java/datadog/trace/instrumentation/elasticsearch2/Elasticsearch2TransportClientInstrumentation.java index 7aeb49a7db..c9e8ffe5f9 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-2/src/main/java/datadog/trace/instrumentation/elasticsearch2/Elasticsearch2TransportClientInstrumentation.java +++ b/dd-java-agent/instrumentation/elasticsearch-transport-2/src/main/java/datadog/trace/instrumentation/elasticsearch2/Elasticsearch2TransportClientInstrumentation.java @@ -11,6 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.DDAdvice; import datadog.trace.agent.tooling.DDTransformers; +import datadog.trace.agent.tooling.HelperInjector; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.api.DDTags; import io.opentracing.Scope; @@ -45,6 +46,11 @@ public class Elasticsearch2TransportClientInstrumentation extends Instrumenter.C // If we want to be more generic, we could instrument the interface instead: // .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient")))) classLoaderHasClasses("org.elasticsearch.plugins.SitePlugin")) + .transform( + new HelperInjector( + "com.google.common.base.Preconditions", + "com.google.common.base.Joiner", + "datadog.trace.instrumentation.elasticsearch2.TransportActionListener")) .transform(DDTransformers.defaultTransformers()) .transform( DDAdvice.create() @@ -78,7 +84,7 @@ public class Elasticsearch2TransportClientInstrumentation extends Instrumenter.C .withTag("elasticsearch.request", actionRequest.getClass().getSimpleName()) .startActive(false); - actionListener = new TransportActionListener<>(actionListener, scope.span()); + actionListener = new TransportActionListener<>(actionRequest, actionListener, scope.span()); return scope; } diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-2/src/main/java/datadog/trace/instrumentation/elasticsearch2/TransportActionListener.java b/dd-java-agent/instrumentation/elasticsearch-transport-2/src/main/java/datadog/trace/instrumentation/elasticsearch2/TransportActionListener.java index 9a8002a181..4c59638a2d 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-2/src/main/java/datadog/trace/instrumentation/elasticsearch2/TransportActionListener.java +++ b/dd-java-agent/instrumentation/elasticsearch-transport-2/src/main/java/datadog/trace/instrumentation/elasticsearch2/TransportActionListener.java @@ -2,30 +2,80 @@ package datadog.trace.instrumentation.elasticsearch2; import static io.opentracing.log.Fields.ERROR_OBJECT; +import com.google.common.base.Joiner; import io.opentracing.Span; import io.opentracing.tag.Tags; import java.util.Collections; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.bulk.BulkShardResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; -public class TransportActionListener implements ActionListener { +public class TransportActionListener implements ActionListener { private final ActionListener listener; private final Span span; - public TransportActionListener(final ActionListener listener, final Span span) { + public TransportActionListener( + final ActionRequest actionRequest, final ActionListener listener, final Span span) { this.listener = listener; this.span = span; + onRequest(actionRequest); + } + + private void onRequest(final ActionRequest request) { + if (request instanceof IndicesRequest) { + final IndicesRequest req = (IndicesRequest) request; + if (req.indices() != null) { + span.setTag("elasticsearch.request.indices", Joiner.on(",").join(req.indices())); + } + } + if (request instanceof DocumentRequest) { + final DocumentRequest req = (DocumentRequest) request; + span.setTag("elasticsearch.request.write.type", req.type()); + span.setTag("elasticsearch.request.write.routing", req.routing()); + } } @Override public void onResponse(final T response) { - if (response instanceof ActionResponse) { - final ActionResponse ar = (ActionResponse) response; - if (ar.remoteAddress() != null) { - Tags.PEER_HOSTNAME.set(span, ar.remoteAddress().getHost()); - Tags.PEER_PORT.set(span, ar.remoteAddress().getPort()); + if (response.remoteAddress() != null) { + Tags.PEER_HOSTNAME.set(span, response.remoteAddress().getHost()); + Tags.PEER_HOST_IPV4.set(span, response.remoteAddress().getAddress()); + Tags.PEER_PORT.set(span, response.remoteAddress().getPort()); + } + + if (response instanceof GetResponse) { + final GetResponse resp = (GetResponse) response; + span.setTag("elasticsearch.type", resp.getType()); + span.setTag("elasticsearch.id", resp.getId()); + span.setTag("elasticsearch.version", resp.getVersion()); + } + + if (response instanceof BroadcastResponse) { + final BroadcastResponse resp = (BroadcastResponse) response; + span.setTag("elasticsearch.shard.broadcast.total", resp.getTotalShards()); + span.setTag("elasticsearch.shard.broadcast.successful", resp.getSuccessfulShards()); + span.setTag("elasticsearch.shard.broadcast.failed", resp.getFailedShards()); + } + + if (response instanceof BulkShardResponse) { + final BulkShardResponse resp = (BulkShardResponse) response; + span.setTag("elasticsearch.shard.bulk.id", resp.getShardId().getId()); + span.setTag("elasticsearch.shard.bulk.index", resp.getShardId().getIndex()); + } + + if (response instanceof BaseNodesResponse) { + final BaseNodesResponse resp = (BaseNodesResponse) response; + if (resp.failures().length > 0) { + span.setTag("elasticsearch.node.failures", resp.failures().length); } + span.setTag("elasticsearch.node.cluster.name", resp.getClusterName().value()); } try { @@ -37,6 +87,7 @@ public class TransportActionListener implements ActionListener { @Override public void onFailure(final Throwable e) { + Tags.ERROR.set(span, true); span.log(Collections.singletonMap(ERROR_OBJECT, e)); try { diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-2/src/test/groovy/Elasticsearch2NodeClientTest.groovy b/dd-java-agent/instrumentation/elasticsearch-transport-2/src/test/groovy/Elasticsearch2NodeClientTest.groovy index 233944c3c5..20d2883c8a 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-2/src/test/groovy/Elasticsearch2NodeClientTest.groovy +++ b/dd-java-agent/instrumentation/elasticsearch-transport-2/src/test/groovy/Elasticsearch2NodeClientTest.groovy @@ -4,6 +4,7 @@ import io.opentracing.tag.Tags import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings +import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.node.Node import org.elasticsearch.node.NodeBuilder import spock.lang.Shared @@ -38,6 +39,8 @@ class Elasticsearch2NodeClientTest extends AgentTestRunner { .build() testNode = NodeBuilder.newInstance().clusterName("test-cluster").settings(settings).build() testNode.start() + testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + TEST_WRITER.waitForTraces(1) } def cleanupSpec() { @@ -64,6 +67,7 @@ class Elasticsearch2NodeClientTest extends AgentTestRunner { resourceName "ClusterHealthAction" operationName "elasticsearch.query" spanType null + parent() tags { "$Tags.COMPONENT.key" "elasticsearch-java" "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT @@ -75,4 +79,185 @@ class Elasticsearch2NodeClientTest extends AgentTestRunner { } } } + + def "test elasticsearch error"() { + when: + client.prepareGet(indexName, indexType, id).get() + + then: + thrown IndexNotFoundException + + and: + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + errored true + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + errorTags IndexNotFoundException, "no such index" + defaultTags() + } + } + } + } + + where: + indexName = "invalid-index" + indexType = "test-type" + id = "1" + } + + def "test elasticsearch get"() { + setup: + def indexResult = client.admin().indices().prepareCreate(indexName).get() + + expect: + indexResult.acknowledged + + when: + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + def emptyResult = client.prepareGet(indexName, indexType, id).get() + + then: + !emptyResult.isExists() + emptyResult.id == id + emptyResult.type == indexType + emptyResult.index == indexName + + when: + def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get() + + then: + createResult.id == id + createResult.type == indexType + createResult.index == indexName + + when: + def result = client.prepareGet(indexName, indexType, id).get() + + then: + result.isExists() + result.id == id + result.type == indexType + result.index == indexName + + and: + assertTraces(TEST_WRITER, 6) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "CreateIndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "CreateIndexAction" + "elasticsearch.request" "CreateIndexRequest" + "elasticsearch.request.indices" indexName + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "ClusterHealthAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "ClusterHealthAction" + "elasticsearch.request" "ClusterHealthRequest" + defaultTags() + } + } + } + trace(2, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version"(-1) + defaultTags() + } + } + } + trace(3, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "PutMappingAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "PutMappingAction" + "elasticsearch.request" "PutMappingRequest" + "elasticsearch.request.indices" indexName + defaultTags() + } + } + } + trace(4, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "IndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "IndexAction" + "elasticsearch.request" "IndexRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.request.write.type" indexType + defaultTags() + } + } + } + trace(5, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version" 1 + defaultTags() + } + } + } + } + + where: + indexName = "test-index" + indexType = "test-type" + id = "1" + } } diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-2/src/test/groovy/Elasticsearch2TransportClientTest.groovy b/dd-java-agent/instrumentation/elasticsearch-transport-2/src/test/groovy/Elasticsearch2TransportClientTest.groovy index 29a67d4344..a284f44921 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-2/src/test/groovy/Elasticsearch2TransportClientTest.groovy +++ b/dd-java-agent/instrumentation/elasticsearch-transport-2/src/test/groovy/Elasticsearch2TransportClientTest.groovy @@ -6,8 +6,10 @@ import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.InetSocketTransportAddress +import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.node.Node import org.elasticsearch.node.NodeBuilder +import org.elasticsearch.transport.RemoteTransportException import spock.lang.Shared import static datadog.trace.agent.test.ListWriterAssert.assertTraces @@ -48,6 +50,8 @@ class Elasticsearch2TransportClientTest extends AgentTestRunner { .build() ).build() client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), TCP_PORT)) + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + TEST_WRITER.waitForTraces(1) } def cleanupSpec() { @@ -74,10 +78,12 @@ class Elasticsearch2TransportClientTest extends AgentTestRunner { resourceName "ClusterHealthAction" operationName "elasticsearch.query" spanType null + parent() tags { "$Tags.COMPONENT.key" "elasticsearch-java" "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" "$Tags.PEER_PORT.key" TCP_PORT "elasticsearch.action" "ClusterHealthAction" "elasticsearch.request" "ClusterHealthRequest" @@ -87,4 +93,200 @@ class Elasticsearch2TransportClientTest extends AgentTestRunner { } } } + + def "test elasticsearch error"() { + when: + client.prepareGet(indexName, indexType, id).get() + + then: + thrown IndexNotFoundException + + and: + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + errored true + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + errorTags RemoteTransportException, String + defaultTags() + } + } + } + } + + where: + indexName = "invalid-index" + indexType = "test-type" + id = "1" + } + + def "test elasticsearch get"() { + setup: + def indexResult = client.admin().indices().prepareCreate(indexName).get() + + expect: + indexResult.acknowledged + + when: + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + def emptyResult = client.prepareGet(indexName, indexType, id).get() + + then: + !emptyResult.isExists() + emptyResult.id == id + emptyResult.type == indexType + emptyResult.index == indexName + + when: + def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get() + + then: + createResult.id == id + createResult.type == indexType + createResult.index == indexName + + when: + def result = client.prepareGet(indexName, indexType, id).get() + + then: + result.isExists() + result.id == id + result.type == indexType + result.index == indexName + + and: + assertTraces(TEST_WRITER, 6) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "CreateIndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "CreateIndexAction" + "elasticsearch.request" "CreateIndexRequest" + "elasticsearch.request.indices" indexName + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "ClusterHealthAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "ClusterHealthAction" + "elasticsearch.request" "ClusterHealthRequest" + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + defaultTags() + } + } + } + trace(2, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version"(-1) + defaultTags() + } + } + } + trace(3, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "PutMappingAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "PutMappingAction" + "elasticsearch.request" "PutMappingRequest" + "elasticsearch.request.indices" indexName + defaultTags() + } + } + } + trace(4, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "IndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "IndexAction" + "elasticsearch.request" "IndexRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.request.write.type" indexType + defaultTags() + } + } + } + trace(5, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version" 1 + defaultTags() + } + } + } + } + + where: + indexName = "test-index" + indexType = "test-type" + id = "1" + } } diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-5/src/main/java/datadog/trace/instrumentation/elasticsearch5/Elasticsearch5TransportClientInstrumentation.java b/dd-java-agent/instrumentation/elasticsearch-transport-5/src/main/java/datadog/trace/instrumentation/elasticsearch5/Elasticsearch5TransportClientInstrumentation.java index b4a71d18df..1dbafa4561 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-5/src/main/java/datadog/trace/instrumentation/elasticsearch5/Elasticsearch5TransportClientInstrumentation.java +++ b/dd-java-agent/instrumentation/elasticsearch-transport-5/src/main/java/datadog/trace/instrumentation/elasticsearch5/Elasticsearch5TransportClientInstrumentation.java @@ -11,6 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.DDAdvice; import datadog.trace.agent.tooling.DDTransformers; +import datadog.trace.agent.tooling.HelperInjector; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.api.DDTags; import io.opentracing.Scope; @@ -45,6 +46,11 @@ public class Elasticsearch5TransportClientInstrumentation extends Instrumenter.C // If we want to be more generic, we could instrument the interface instead: // .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient")))) classLoaderHasClasses("org.elasticsearch.percolator.TransportMultiPercolateAction")) + .transform( + new HelperInjector( + "com.google.common.base.Preconditions", + "com.google.common.base.Joiner", + "datadog.trace.instrumentation.elasticsearch5.TransportActionListener")) .transform(DDTransformers.defaultTransformers()) .transform( DDAdvice.create() @@ -78,7 +84,7 @@ public class Elasticsearch5TransportClientInstrumentation extends Instrumenter.C .withTag("elasticsearch.request", actionRequest.getClass().getSimpleName()) .startActive(false); - actionListener = new TransportActionListener<>(actionListener, scope.span()); + actionListener = new TransportActionListener<>(actionRequest, actionListener, scope.span()); return scope; } diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-5/src/main/java/datadog/trace/instrumentation/elasticsearch5/TransportActionListener.java b/dd-java-agent/instrumentation/elasticsearch-transport-5/src/main/java/datadog/trace/instrumentation/elasticsearch5/TransportActionListener.java index b3257e5f83..17fecbbc8e 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-5/src/main/java/datadog/trace/instrumentation/elasticsearch5/TransportActionListener.java +++ b/dd-java-agent/instrumentation/elasticsearch-transport-5/src/main/java/datadog/trace/instrumentation/elasticsearch5/TransportActionListener.java @@ -2,20 +2,49 @@ package datadog.trace.instrumentation.elasticsearch5; import static io.opentracing.log.Fields.ERROR_OBJECT; +import com.google.common.base.Joiner; import io.opentracing.Span; import io.opentracing.tag.Tags; import java.util.Collections; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.DocumentRequest; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.bulk.BulkShardResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; public class TransportActionListener implements ActionListener { private final ActionListener listener; private final Span span; - public TransportActionListener(final ActionListener listener, final Span span) { + public TransportActionListener( + final ActionRequest actionRequest, final ActionListener listener, final Span span) { this.listener = listener; this.span = span; + onRequest(actionRequest); + } + + private void onRequest(final ActionRequest request) { + if (request != null) { + span.setTag("elasticsearch.request.description", request.getDescription()); + } + if (request instanceof IndicesRequest) { + final IndicesRequest req = (IndicesRequest) request; + if (req.indices() != null) { + span.setTag("elasticsearch.request.indices", Joiner.on(",").join(req.indices())); + } + } + if (request instanceof DocumentRequest) { + final DocumentRequest req = (DocumentRequest) request; + span.setTag("elasticsearch.request.write.type", req.type()); + span.setTag("elasticsearch.request.write.routing", req.routing()); + } } @Override @@ -26,6 +55,46 @@ public class TransportActionListener implements Action Tags.PEER_PORT.set(span, response.remoteAddress().getPort()); } + if (response instanceof GetResponse) { + final GetResponse resp = (GetResponse) response; + span.setTag("elasticsearch.type", resp.getType()); + span.setTag("elasticsearch.id", resp.getId()); + span.setTag("elasticsearch.version", resp.getVersion()); + } + + if (response instanceof BroadcastResponse) { + final BroadcastResponse resp = (BroadcastResponse) response; + span.setTag("elasticsearch.shard.broadcast.total", resp.getTotalShards()); + span.setTag("elasticsearch.shard.broadcast.successful", resp.getSuccessfulShards()); + span.setTag("elasticsearch.shard.broadcast.failed", resp.getFailedShards()); + } + + if (response instanceof ReplicationResponse) { + final ReplicationResponse resp = (ReplicationResponse) response; + span.setTag("elasticsearch.shard.replication.total", resp.getShardInfo().getTotal()); + span.setTag( + "elasticsearch.shard.replication.successful", resp.getShardInfo().getSuccessful()); + span.setTag("elasticsearch.shard.replication.failed", resp.getShardInfo().getFailed()); + } + + if (response instanceof IndexResponse) { + span.setTag("elasticsearch.response.status", ((IndexResponse) response).status().getStatus()); + } + + if (response instanceof BulkShardResponse) { + final BulkShardResponse resp = (BulkShardResponse) response; + span.setTag("elasticsearch.shard.bulk.id", resp.getShardId().getId()); + span.setTag("elasticsearch.shard.bulk.index", resp.getShardId().getIndexName()); + } + + if (response instanceof BaseNodesResponse) { + final BaseNodesResponse resp = (BaseNodesResponse) response; + if (resp.hasFailures()) { + span.setTag("elasticsearch.node.failures", resp.failures().size()); + } + span.setTag("elasticsearch.node.cluster.name", resp.getClusterName().value()); + } + try { listener.onResponse(response); } finally { @@ -35,6 +104,7 @@ public class TransportActionListener implements Action @Override public void onFailure(final Exception e) { + Tags.ERROR.set(span, true); span.log(Collections.singletonMap(ERROR_OBJECT, e)); try { diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-5/src/test/groovy/Elasticsearch5NodeClientTest.groovy b/dd-java-agent/instrumentation/elasticsearch-transport-5/src/test/groovy/Elasticsearch5NodeClientTest.groovy index 1c8e6a0b38..4da1acfbbb 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-5/src/test/groovy/Elasticsearch5NodeClientTest.groovy +++ b/dd-java-agent/instrumentation/elasticsearch-transport-5/src/test/groovy/Elasticsearch5NodeClientTest.groovy @@ -5,6 +5,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.env.Environment +import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.node.Node import org.elasticsearch.node.internal.InternalSettingsPreparer import org.elasticsearch.transport.Netty3Plugin @@ -44,6 +45,8 @@ class Elasticsearch5NodeClientTest extends AgentTestRunner { .build() testNode = new Node(new Environment(InternalSettingsPreparer.prepareSettings(settings)), [Netty3Plugin]) testNode.start() + testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + TEST_WRITER.waitForTraces(1) } def cleanupSpec() { @@ -82,4 +85,190 @@ class Elasticsearch5NodeClientTest extends AgentTestRunner { } } } + + def "test elasticsearch error"() { + when: + client.prepareGet(indexName, indexType, id).get() + + then: + thrown IndexNotFoundException + + and: + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + errored true + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + errorTags IndexNotFoundException, "no such index" + defaultTags() + } + } + } + } + + where: + indexName = "invalid-index" + indexType = "test-type" + id = "1" + } + + def "test elasticsearch get"() { + setup: + def indexResult = client.admin().indices().prepareCreate(indexName).get() + + expect: + indexResult.acknowledged + + when: + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + def emptyResult = client.prepareGet(indexName, indexType, id).get() + + then: + !emptyResult.isExists() + emptyResult.id == id + emptyResult.type == indexType + emptyResult.index == indexName + + when: + def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get() + + then: + createResult.id == id + createResult.type == indexType + createResult.index == indexName + createResult.status().status == 201 + + when: + def result = client.prepareGet(indexName, indexType, id).get() + + then: + result.isExists() + result.id == id + result.type == indexType + result.index == indexName + + and: + assertTraces(TEST_WRITER, 6) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "CreateIndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "CreateIndexAction" + "elasticsearch.request" "CreateIndexRequest" + "elasticsearch.request.indices" indexName + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "ClusterHealthAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "ClusterHealthAction" + "elasticsearch.request" "ClusterHealthRequest" + defaultTags() + } + } + } + trace(2, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version"(-1) + defaultTags() + } + } + } + trace(3, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "PutMappingAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "PutMappingAction" + "elasticsearch.request" "PutMappingRequest" + defaultTags() + } + } + } + trace(4, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "IndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "IndexAction" + "elasticsearch.request" "IndexRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.request.write.type" indexType + "elasticsearch.response.status" 201 + "elasticsearch.shard.replication.total" 2 + "elasticsearch.shard.replication.successful" 1 + "elasticsearch.shard.replication.failed" 0 + "elasticsearch.request.description" "index {[test-index][test-type][1], source[{}]}" + defaultTags() + } + } + } + trace(5, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version" 1 + defaultTags() + } + } + } + } + + where: + indexName = "test-index" + indexType = "test-type" + id = "1" + } } diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-5/src/test/groovy/Elasticsearch5TransportClientTest.groovy b/dd-java-agent/instrumentation/elasticsearch-transport-5/src/test/groovy/Elasticsearch5TransportClientTest.groovy index 8e0cc20b61..3bc49699c9 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-5/src/test/groovy/Elasticsearch5TransportClientTest.groovy +++ b/dd-java-agent/instrumentation/elasticsearch-transport-5/src/test/groovy/Elasticsearch5TransportClientTest.groovy @@ -7,9 +7,11 @@ import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.InetSocketTransportAddress import org.elasticsearch.env.Environment +import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.node.Node import org.elasticsearch.node.internal.InternalSettingsPreparer import org.elasticsearch.transport.Netty3Plugin +import org.elasticsearch.transport.RemoteTransportException import org.elasticsearch.transport.client.PreBuiltTransportClient import spock.lang.Shared @@ -55,6 +57,8 @@ class Elasticsearch5TransportClientTest extends AgentTestRunner { .build() ) client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), TCP_PORT)) + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + TEST_WRITER.waitForTraces(1) } def cleanupSpec() { @@ -96,4 +100,186 @@ class Elasticsearch5TransportClientTest extends AgentTestRunner { } } } + + def "test elasticsearch error"() { + when: + client.prepareGet(indexName, indexType, id).get() + + then: + thrown IndexNotFoundException + + and: + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + errored true + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + errorTags RemoteTransportException, String + defaultTags() + } + } + } + } + + where: + indexName = "invalid-index" + indexType = "test-type" + id = "1" + } + + def "test elasticsearch get"() { + setup: + def indexResult = client.admin().indices().prepareCreate(indexName).get() + + expect: + indexResult.acknowledged + + when: + def emptyResult = client.prepareGet(indexName, indexType, id).get() + + then: + !emptyResult.isExists() + emptyResult.id == id + emptyResult.type == indexType + emptyResult.index == indexName + + when: + def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get() + + then: + createResult.id == id + createResult.type == indexType + createResult.index == indexName + createResult.status().status == 201 + + when: + def result = client.prepareGet(indexName, indexType, id).get() + + then: + result.isExists() + result.id == id + result.type == indexType + result.index == indexName + + and: + assertTraces(TEST_WRITER, 5) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "CreateIndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "CreateIndexAction" + "elasticsearch.request" "CreateIndexRequest" + "elasticsearch.request.indices" indexName + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version"(-1) + defaultTags() + } + } + } + trace(2, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "PutMappingAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "PutMappingAction" + "elasticsearch.request" "PutMappingRequest" + defaultTags() + } + } + } + trace(3, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "IndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "IndexAction" + "elasticsearch.request" "IndexRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.request.write.type" indexType + "elasticsearch.response.status" 201 + "elasticsearch.shard.replication.total" 2 + "elasticsearch.shard.replication.successful" 1 + "elasticsearch.shard.replication.failed" 0 + "elasticsearch.request.description" "index {[test-index][test-type][1], source[{}]}" + defaultTags() + } + } + } + trace(4, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "127.0.0.1" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version" 1 + defaultTags() + } + } + } + } + + where: + indexName = "test-index" + indexType = "test-type" + id = "1" + } } diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-6/src/main/java/datadog/trace/instrumentation/elasticsearch6/Elasticsearch6TransportClientInstrumentation.java b/dd-java-agent/instrumentation/elasticsearch-transport-6/src/main/java/datadog/trace/instrumentation/elasticsearch6/Elasticsearch6TransportClientInstrumentation.java index cebfec174b..f1e80adac4 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-6/src/main/java/datadog/trace/instrumentation/elasticsearch6/Elasticsearch6TransportClientInstrumentation.java +++ b/dd-java-agent/instrumentation/elasticsearch-transport-6/src/main/java/datadog/trace/instrumentation/elasticsearch6/Elasticsearch6TransportClientInstrumentation.java @@ -11,6 +11,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.DDAdvice; import datadog.trace.agent.tooling.DDTransformers; +import datadog.trace.agent.tooling.HelperInjector; import datadog.trace.agent.tooling.Instrumenter; import datadog.trace.api.DDTags; import io.opentracing.Scope; @@ -49,6 +50,11 @@ public class Elasticsearch6TransportClientInstrumentation extends Instrumenter.C // If we want to be more generic, we could instrument the interface instead: // .and(hasSuperType(named("org.elasticsearch.client.ElasticsearchClient")))) classLoaderHasClasses("org.elasticsearch.client.RestClientBuilder$2")) + .transform( + new HelperInjector( + "com.google.common.base.Preconditions", + "com.google.common.base.Joiner", + "datadog.trace.instrumentation.elasticsearch6.TransportActionListener")) .transform(DDTransformers.defaultTransformers()) .transform( DDAdvice.create() @@ -82,7 +88,7 @@ public class Elasticsearch6TransportClientInstrumentation extends Instrumenter.C .withTag("elasticsearch.request", actionRequest.getClass().getSimpleName()) .startActive(false); - actionListener = new TransportActionListener<>(actionListener, scope.span()); + actionListener = new TransportActionListener<>(actionRequest, actionListener, scope.span()); return scope; } diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-6/src/main/java/datadog/trace/instrumentation/elasticsearch6/TransportActionListener.java b/dd-java-agent/instrumentation/elasticsearch-transport-6/src/main/java/datadog/trace/instrumentation/elasticsearch6/TransportActionListener.java index 46c0fc5196..488f0160c7 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-6/src/main/java/datadog/trace/instrumentation/elasticsearch6/TransportActionListener.java +++ b/dd-java-agent/instrumentation/elasticsearch-transport-6/src/main/java/datadog/trace/instrumentation/elasticsearch6/TransportActionListener.java @@ -2,11 +2,21 @@ package datadog.trace.instrumentation.elasticsearch6; import static io.opentracing.log.Fields.ERROR_OBJECT; +import com.google.common.base.Joiner; import io.opentracing.Span; import io.opentracing.tag.Tags; import java.util.Collections; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.bulk.BulkShardResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse; /** * Most of this class is identical to version 5's instrumentation, but they changed an interface to @@ -17,9 +27,29 @@ public class TransportActionListener implements Action private final ActionListener listener; private final Span span; - public TransportActionListener(final ActionListener listener, final Span span) { + public TransportActionListener( + final ActionRequest actionRequest, final ActionListener listener, final Span span) { this.listener = listener; this.span = span; + onRequest(actionRequest); + } + + private void onRequest(final ActionRequest request) { + if (request != null) { + span.setTag("elasticsearch.request.description", request.getDescription()); + } + if (request instanceof IndicesRequest) { + final IndicesRequest req = (IndicesRequest) request; + if (req.indices() != null) { + span.setTag("elasticsearch.request.indices", Joiner.on(",").join(req.indices())); + } + } + if (request instanceof DocWriteRequest) { + final DocWriteRequest req = (DocWriteRequest) request; + span.setTag("elasticsearch.request.write.type", req.type()); + span.setTag("elasticsearch.request.write.routing", req.routing()); + span.setTag("elasticsearch.request.write.version", req.version()); + } } @Override @@ -30,6 +60,46 @@ public class TransportActionListener implements Action Tags.PEER_PORT.set(span, response.remoteAddress().getPort()); } + if (response instanceof GetResponse) { + final GetResponse resp = (GetResponse) response; + span.setTag("elasticsearch.type", resp.getType()); + span.setTag("elasticsearch.id", resp.getId()); + span.setTag("elasticsearch.version", resp.getVersion()); + } + + if (response instanceof BroadcastResponse) { + final BroadcastResponse resp = (BroadcastResponse) response; + span.setTag("elasticsearch.shard.broadcast.total", resp.getTotalShards()); + span.setTag("elasticsearch.shard.broadcast.successful", resp.getSuccessfulShards()); + span.setTag("elasticsearch.shard.broadcast.failed", resp.getFailedShards()); + } + + if (response instanceof ReplicationResponse) { + final ReplicationResponse resp = (ReplicationResponse) response; + span.setTag("elasticsearch.shard.replication.total", resp.getShardInfo().getTotal()); + span.setTag( + "elasticsearch.shard.replication.successful", resp.getShardInfo().getSuccessful()); + span.setTag("elasticsearch.shard.replication.failed", resp.getShardInfo().getFailed()); + } + + if (response instanceof IndexResponse) { + span.setTag("elasticsearch.response.status", ((IndexResponse) response).status().getStatus()); + } + + if (response instanceof BulkShardResponse) { + final BulkShardResponse resp = (BulkShardResponse) response; + span.setTag("elasticsearch.shard.bulk.id", resp.getShardId().getId()); + span.setTag("elasticsearch.shard.bulk.index", resp.getShardId().getIndexName()); + } + + if (response instanceof BaseNodesResponse) { + final BaseNodesResponse resp = (BaseNodesResponse) response; + if (resp.hasFailures()) { + span.setTag("elasticsearch.node.failures", resp.failures().size()); + } + span.setTag("elasticsearch.node.cluster.name", resp.getClusterName().value()); + } + try { listener.onResponse(response); } finally { @@ -39,6 +109,7 @@ public class TransportActionListener implements Action @Override public void onFailure(final Exception e) { + Tags.ERROR.set(span, true); span.log(Collections.singletonMap(ERROR_OBJECT, e)); try { diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-6/src/test/groovy/Elasticsearch6NodeClientTest.groovy b/dd-java-agent/instrumentation/elasticsearch-transport-6/src/test/groovy/Elasticsearch6NodeClientTest.groovy index b4954ed782..2b69ab27db 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-6/src/test/groovy/Elasticsearch6NodeClientTest.groovy +++ b/dd-java-agent/instrumentation/elasticsearch-transport-6/src/test/groovy/Elasticsearch6NodeClientTest.groovy @@ -4,6 +4,7 @@ import io.opentracing.tag.Tags import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings +import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.node.InternalSettingsPreparer import org.elasticsearch.node.Node import org.elasticsearch.transport.Netty4Plugin @@ -41,6 +42,8 @@ class Elasticsearch6NodeClientTest extends AgentTestRunner { .build() testNode = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin]) testNode.start() + testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + TEST_WRITER.waitForTraces(1) } def cleanupSpec() { @@ -53,9 +56,9 @@ class Elasticsearch6NodeClientTest extends AgentTestRunner { def "test elasticsearch status"() { setup: - def result = client.admin().cluster().health(new ClusterHealthRequest()) + def result = client.admin().cluster().health(new ClusterHealthRequest()).get() - def status = result.get().status + def status = result.status expect: status.name() == "GREEN" @@ -78,4 +81,175 @@ class Elasticsearch6NodeClientTest extends AgentTestRunner { } } } + + def "test elasticsearch error"() { + when: + client.prepareGet(indexName, indexType, id).get() + + then: + thrown IndexNotFoundException + + and: + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + errored true + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + errorTags IndexNotFoundException, "no such index" + defaultTags() + } + } + } + } + + where: + indexName = "invalid-index" + indexType = "test-type" + id = "1" + } + + def "test elasticsearch get"() { + setup: + def indexResult = client.admin().indices().prepareCreate(indexName).get() + + expect: + indexResult.index() == indexName + + when: + def emptyResult = client.prepareGet(indexName, indexType, id).get() + + then: + !emptyResult.isExists() + emptyResult.id == id + emptyResult.type == indexType + emptyResult.index == indexName + + when: + def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get() + + then: + createResult.id == id + createResult.type == indexType + createResult.index == indexName + createResult.status().status == 201 + + when: + def result = client.prepareGet(indexName, indexType, id).get() + + then: + result.isExists() + result.id == id + result.type == indexType + result.index == indexName + + and: + assertTraces(TEST_WRITER, 5) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "CreateIndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "CreateIndexAction" + "elasticsearch.request" "CreateIndexRequest" + "elasticsearch.request.indices" indexName + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version"(-1) + defaultTags() + } + } + } + trace(2, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "PutMappingAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "PutMappingAction" + "elasticsearch.request" "PutMappingRequest" + defaultTags() + } + } + } + trace(3, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "IndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "IndexAction" + "elasticsearch.request" "IndexRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.request.write.type" indexType + "elasticsearch.request.write.version"(-3) + "elasticsearch.response.status" 201 + "elasticsearch.shard.replication.total" 2 + "elasticsearch.shard.replication.successful" 1 + "elasticsearch.shard.replication.failed" 0 + "elasticsearch.request.description" "index {[test-index][test-type][1], source[{}]}" + defaultTags() + } + } + } + trace(4, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version" 1 + defaultTags() + } + } + } + } + + where: + indexName = "test-index" + indexType = "test-type" + id = "1" + } } diff --git a/dd-java-agent/instrumentation/elasticsearch-transport-6/src/test/groovy/Elasticsearch6TransportClientTest.groovy b/dd-java-agent/instrumentation/elasticsearch-transport-6/src/test/groovy/Elasticsearch6TransportClientTest.groovy index 023369d5a6..0889db29a3 100644 --- a/dd-java-agent/instrumentation/elasticsearch-transport-6/src/test/groovy/Elasticsearch6TransportClientTest.groovy +++ b/dd-java-agent/instrumentation/elasticsearch-transport-6/src/test/groovy/Elasticsearch6TransportClientTest.groovy @@ -6,9 +6,11 @@ import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.TransportAddress +import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.node.InternalSettingsPreparer import org.elasticsearch.node.Node import org.elasticsearch.transport.Netty4Plugin +import org.elasticsearch.transport.RemoteTransportException import org.elasticsearch.transport.client.PreBuiltTransportClient import spock.lang.Shared @@ -52,6 +54,8 @@ class Elasticsearch6TransportClientTest extends AgentTestRunner { .build() ) client.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), TCP_PORT)) + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(5000) + TEST_WRITER.waitForTraces(1) } def cleanupSpec() { @@ -92,4 +96,187 @@ class Elasticsearch6TransportClientTest extends AgentTestRunner { } } } + + def "test elasticsearch error"() { + when: + client.prepareGet(indexName, indexType, id).get() + + then: + thrown IndexNotFoundException + + and: + assertTraces(TEST_WRITER, 1) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + errored true + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + errorTags RemoteTransportException, String + defaultTags() + } + } + } + } + + where: + indexName = "invalid-index" + indexType = "test-type" + id = "1" + } + + def "test elasticsearch get"() { + setup: + def indexResult = client.admin().indices().prepareCreate(indexName).get() + + expect: + indexResult.index() == indexName + + when: + def emptyResult = client.prepareGet(indexName, indexType, id).get() + + then: + !emptyResult.isExists() + emptyResult.id == id + emptyResult.type == indexType + emptyResult.index == indexName + + when: + def createResult = client.prepareIndex(indexName, indexType, id).setSource([:]).get() + + then: + createResult.id == id + createResult.type == indexType + createResult.index == indexName + createResult.status().status == 201 + + when: + def result = client.prepareGet(indexName, indexType, id).get() + + then: + result.isExists() + result.id == id + result.type == indexType + result.index == indexName + + and: + assertTraces(TEST_WRITER, 5) { + trace(0, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "CreateIndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "localhost" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "CreateIndexAction" + "elasticsearch.request" "CreateIndexRequest" + "elasticsearch.request.indices" indexName + defaultTags() + } + } + } + trace(1, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "localhost" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version"(-1) + defaultTags() + } + } + } + trace(2, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "PutMappingAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "elasticsearch.action" "PutMappingAction" + "elasticsearch.request" "PutMappingRequest" + defaultTags() + } + } + } + trace(3, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "IndexAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "localhost" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "IndexAction" + "elasticsearch.request" "IndexRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.request.write.type" indexType + "elasticsearch.request.write.version"(-3) + "elasticsearch.response.status" 201 + "elasticsearch.shard.replication.total" 2 + "elasticsearch.shard.replication.successful" 1 + "elasticsearch.shard.replication.failed" 0 + "elasticsearch.request.description" "index {[test-index][test-type][1], source[{}]}" + defaultTags() + } + } + } + trace(4, 1) { + span(0) { + serviceName "elasticsearch" + resourceName "GetAction" + operationName "elasticsearch.query" + spanType null + tags { + "$Tags.COMPONENT.key" "elasticsearch-java" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "$Tags.PEER_HOSTNAME.key" "localhost" + "$Tags.PEER_HOST_IPV4.key" "127.0.0.1" + "$Tags.PEER_PORT.key" TCP_PORT + "elasticsearch.action" "GetAction" + "elasticsearch.request" "GetRequest" + "elasticsearch.request.indices" indexName + "elasticsearch.type" indexType + "elasticsearch.id" "1" + "elasticsearch.version" 1 + defaultTags() + } + } + } + } + + where: + indexName = "test-index" + indexType = "test-type" + id = "1" + } } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/ListWriterAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/ListWriterAssert.groovy index 47589c04f7..0cd127873b 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/ListWriterAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/ListWriterAssert.groovy @@ -1,6 +1,10 @@ package datadog.trace.agent.test import datadog.trace.common.writer.ListWriter +import org.codehaus.groovy.runtime.powerassert.PowerAssertionError +import org.spockframework.runtime.Condition +import org.spockframework.runtime.ConditionNotSatisfiedError +import org.spockframework.runtime.model.TextPosition import static datadog.trace.agent.test.TraceAssert.assertTrace @@ -14,21 +18,39 @@ class ListWriterAssert { size = writer.size() } - static ListWriterAssert assertTraces(ListWriter writer, int expectedSize, - @DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { - writer.waitForTraces(expectedSize) - assert writer.size() == expectedSize - def asserter = new ListWriterAssert(writer) - def clone = (Closure) spec.clone() - clone.delegate = asserter - clone.resolveStrategy = Closure.DELEGATE_FIRST - clone(asserter) - asserter.assertTracesAllVerified() - asserter + static void assertTraces(ListWriter writer, int expectedSize, + @DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + try { + writer.waitForTraces(expectedSize) + assert writer.size() == expectedSize + def asserter = new ListWriterAssert(writer) + def clone = (Closure) spec.clone() + clone.delegate = asserter + clone.resolveStrategy = Closure.DELEGATE_FIRST + clone(asserter) + asserter.assertTracesAllVerified() + } catch (PowerAssertionError e) { + def stackLine = null + for (int i = 0; i < e.stackTrace.length; i++) { + def className = e.stackTrace[i].className + def skip = className.startsWith("org.codehaus.groovy.") || + className.startsWith("datadog.trace.agent.test.") || + className.startsWith("sun.reflect.") || + className.startsWith("groovy.lang.") || + className.startsWith("java.lang.") + if (skip) { + continue + } + stackLine = e.stackTrace[i] + break + } + def condition = new Condition(null, "$stackLine", TextPosition.create(stackLine == null ? 0 : stackLine.lineNumber, 0), e.message, null, e) + throw new ConditionNotSatisfiedError(condition, e) + } } - TraceAssert trace(int index, int expectedSize, - @DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + void trace(int index, int expectedSize, + @DelegatesTo(value = TraceAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { if (index >= size) { throw new ArrayIndexOutOfBoundsException(index) } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/SpanAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/SpanAssert.groovy index 124d35ac70..4809d64677 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/SpanAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/SpanAssert.groovy @@ -11,8 +11,8 @@ class SpanAssert { this.span = span } - static SpanAssert assertSpan(DDSpan span, - @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + static void assertSpan(DDSpan span, + @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { def asserter = new SpanAssert(span) def clone = (Closure) spec.clone() clone.delegate = asserter @@ -51,7 +51,7 @@ class SpanAssert { assert span.isError() == errored } - def tags(@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { - return assertTags(span, spec) + void tags(@DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + assertTags(span, spec) } } diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy index 8851d5670f..7710627c3e 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TagsAssert.groovy @@ -10,8 +10,8 @@ class TagsAssert { this.tags = new TreeMap(span.tags) } - static TagsAssert assertTags(DDSpan span, - @DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + static void assertTags(DDSpan span, + @DelegatesTo(value = TagsAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { def asserter = new TagsAssert(span) def clone = (Closure) spec.clone() clone.delegate = asserter @@ -25,35 +25,35 @@ class TagsAssert { assertedTags.add("thread.name") assertedTags.add("thread.id") - tags["thread.name"] != null - tags["thread.id"] != null + assert tags["thread.name"] != null + assert tags["thread.id"] != null } def errorTags(Class errorType) { errorTags(errorType, null) } - def errorTags(Class errorType, String message) { - assertedTags.add("error") - assertedTags.add("error.type") - assertedTags.add("error.stack") + def errorTags(Class errorType, Object message) { + methodMissing("error", [true].toArray()) + methodMissing("error.type", [errorType.name].toArray()) + methodMissing("error.stack", [String].toArray()) if (message != null) { - assertedTags.add("error.msg") - tags["error.msg"] == message + methodMissing("error.msg", [message].toArray()) } - - tags["error"] == true - tags["error.type"] == errorType - tags["error.stack"] instanceof String } def methodMissing(String name, args) { if (args.length > 1) { - throw new IllegalArgumentException(args) + throw new IllegalArgumentException(args.toString()) } assertedTags.add(name) - assert tags[name] == args[0] + def arg = args[0] + if (arg instanceof Class) { + assert ((Class) arg).isInstance(tags[name]) + } else { + assert tags[name] == arg + } } void assertTracesAllVerified() { diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TraceAssert.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TraceAssert.groovy index 99301e80bc..52147892f7 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TraceAssert.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/TraceAssert.groovy @@ -14,8 +14,8 @@ class TraceAssert { size = trace.size() } - static TraceAssert assertTrace(List trace, int expectedSize, - @DelegatesTo(value = File, strategy = Closure.DELEGATE_FIRST) Closure spec) { + static void assertTrace(List trace, int expectedSize, + @DelegatesTo(value = File, strategy = Closure.DELEGATE_FIRST) Closure spec) { assert trace.size() == expectedSize def asserter = new TraceAssert(trace) def clone = (Closure) spec.clone() @@ -30,7 +30,7 @@ class TraceAssert { trace.get(index) } - SpanAssert span(int index, @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { + void span(int index, @DelegatesTo(value = SpanAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) { if (index >= size) { throw new ArrayIndexOutOfBoundsException(index) } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java index fe3e81aca4..3df6f9f075 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDSpanContext.java @@ -232,7 +232,7 @@ public class DDSpanContext implements io.opentracing.SpanContext { * @param value the value of the tag. tags with null values are ignored. */ public synchronized void setTag(final String tag, final Object value) { - if (value == null) { + if (value == null || (value instanceof String && ((String) value).isEmpty())) { tags.remove(tag); return; } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java index a6ff6c11a7..3ea6806b21 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/DDTracer.java @@ -126,8 +126,7 @@ public class DDTracer implements io.opentracing.Tracer { } }); } catch (final IllegalStateException ex) { - // The JVM might be shutting down. - log.debug("Error adding shutdown hook.", ex); + // The JVM is already shutting down. } registry = new CodecRegistry(); diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java b/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java index 06f0f4b19d..c1e0791c9c 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/PendingTrace.java @@ -206,8 +206,7 @@ public class PendingTrace extends ConcurrentLinkedDeque { } }); } catch (final IllegalStateException ex) { - // The JVM might be shutting down. - log.debug("Error adding shutdown hook.", ex); + // The JVM is already shutting down. } }