From 514553eac25efacb251a7a17a0f1cd72d2d0bf8f Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Thu, 19 Aug 2021 12:59:47 +0300 Subject: [PATCH] Context propagation to elasticsearch-transport callbacks (#3861) --- .../javaagent/build.gradle.kts | 1 + .../v5_0/AbstractClientInstrumentation.java | 6 +- .../v5_0/TransportActionListener.java | 16 ++++- .../Elasticsearch5NodeClientTest.groovy | 70 ++++++++++++++---- .../Elasticsearch5TransportClientTest.groovy | 61 +++++++++++++--- .../javaagent/build.gradle.kts | 1 + .../v5_3/AbstractClientInstrumentation.java | 6 +- .../v5_3/TransportActionListener.java | 16 ++++- .../Elasticsearch53NodeClientTest.groovy | 72 +++++++++++++++---- .../Elasticsearch53TransportClientTest.groovy | 61 +++++++++++++--- .../javaagent/build.gradle.kts | 1 + .../v6_0/AbstractClientInstrumentation.java | 6 +- .../v6_0/TransportActionListener.java | 16 ++++- .../Elasticsearch6NodeClientTest.groovy | 71 ++++++++++++++---- .../Elasticsearch6TransportClientTest.groovy | 61 +++++++++++++--- .../build.gradle.kts | 10 +++ .../AbstractElasticsearchClientTest.groovy | 65 +++++++++++++++++ ...AbstractElasticsearchNodeClientTest.groovy | 42 +++++++++++ ...actElasticsearchTransportClientTest.groovy | 42 +++++++++++ settings.gradle.kts | 1 + 20 files changed, 536 insertions(+), 89 deletions(-) create mode 100644 instrumentation/elasticsearch/elasticsearch-transport-testing/build.gradle.kts create mode 100644 instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchClientTest.groovy create mode 100644 instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchNodeClientTest.groovy create mode 100644 instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchTransportClientTest.groovy diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/build.gradle.kts b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/build.gradle.kts index 328a39559f..2690ac2692 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/build.gradle.kts +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/build.gradle.kts @@ -33,6 +33,7 @@ dependencies { testInstrumentation(project(":instrumentation:apache-httpasyncclient-4.1:javaagent")) testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent")) + testImplementation(project(":instrumentation:elasticsearch:elasticsearch-transport-testing")) testImplementation("org.apache.logging.log4j:log4j-core:2.11.0") testImplementation("org.apache.logging.log4j:log4j-api:2.11.0") diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_0/AbstractClientInstrumentation.java b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_0/AbstractClientInstrumentation.java index 373993779b..3b315143f5 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_0/AbstractClientInstrumentation.java +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_0/AbstractClientInstrumentation.java @@ -54,11 +54,13 @@ public class AbstractClientInstrumentation implements TypeInstrumentation { @Advice.Argument(value = 2, readOnly = false) ActionListener actionListener) { - context = tracer().startSpan(currentContext(), null, action); + Context parentContext = currentContext(); + context = tracer().startSpan(parentContext, null, action); scope = context.makeCurrent(); tracer().onRequest(context, action.getClass(), actionRequest.getClass()); - actionListener = new TransportActionListener<>(actionRequest, actionListener, context); + actionListener = + new TransportActionListener<>(actionRequest, actionListener, context, parentContext); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_0/TransportActionListener.java b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_0/TransportActionListener.java index c18cd0cee9..db03f02fb3 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_0/TransportActionListener.java +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_0/TransportActionListener.java @@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; @@ -34,11 +35,16 @@ public class TransportActionListener implements Action private final ActionListener listener; private final Context context; + private final Context parentContext; public TransportActionListener( - ActionRequest actionRequest, ActionListener listener, Context context) { + ActionRequest actionRequest, + ActionListener listener, + Context context, + Context parentContext) { this.listener = listener; this.context = context; + this.parentContext = parentContext; onRequest(actionRequest); } @@ -124,12 +130,16 @@ public class TransportActionListener implements Action } tracer().end(context); - listener.onResponse(response); + try (Scope ignored = parentContext.makeCurrent()) { + listener.onResponse(response); + } } @Override public void onFailure(Exception e) { tracer().endExceptionally(context, e); - listener.onFailure(e); + try (Scope ignored = parentContext.makeCurrent()) { + listener.onFailure(e); + } } } diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/test/groovy/Elasticsearch5NodeClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/test/groovy/Elasticsearch5NodeClientTest.groovy index 97ff1b1bc3..480a611e2d 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/test/groovy/Elasticsearch5NodeClientTest.groovy +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/test/groovy/Elasticsearch5NodeClientTest.groovy @@ -4,12 +4,12 @@ */ import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.StatusCode.ERROR import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest +import org.elasticsearch.client.Client import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.env.Environment @@ -18,8 +18,9 @@ import org.elasticsearch.node.Node import org.elasticsearch.node.internal.InternalSettingsPreparer import org.elasticsearch.transport.Netty3Plugin import spock.lang.Shared +import spock.lang.Unroll -class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification { +class Elasticsearch5NodeClientTest extends AbstractElasticsearchNodeClientTest { public static final long TIMEOUT = 10000 // 10 seconds @Shared @@ -29,7 +30,8 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification { @Shared String clusterName = UUID.randomUUID().toString() - def client = testNode.client() + @Shared + Client client def setupSpec() { @@ -48,10 +50,11 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification { .build() testNode = new Node(new Environment(InternalSettingsPreparer.prepareSettings(settings)), [Netty3Plugin]) testNode.start() + client = testNode.client() runWithSpan("setup") { // this may potentially create multiple requests and therefore multiple spans, so we wrap this call // into a top level trace to get exactly one trace in the result. - testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT) + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT) } ignoreTracesAndClear(1) } @@ -64,20 +67,32 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification { } } - def "test elasticsearch status"() { - setup: - def result = client.admin().cluster().health(new ClusterHealthRequest()) + @Override + Client client() { + client + } - def clusterHealthStatus = result.get().status + @Unroll + def "test elasticsearch status #callKind"() { + setup: + def clusterHealthStatus = runWithSpan("parent") { + call.call() + } expect: clusterHealthStatus.name() == "GREEN" assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { name "ClusterHealthAction" kind CLIENT + childOf(span(0)) attributes { "${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch" "${SemanticAttributes.DB_OPERATION.key}" "ClusterHealthAction" @@ -85,25 +100,46 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification { "elasticsearch.request" "ClusterHealthRequest" } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } + + where: + callKind | call + "sync" | { clusterHealthSync() } + "async" | { clusterHealthAsync() } } - def "test elasticsearch error"() { + @Unroll + def "test elasticsearch error #callKind"() { when: - client.prepareGet(indexName, indexType, id).get() + runWithSpan("parent") { + call.call(indexName, indexType, id) + } then: thrown IndexNotFoundException and: assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + status ERROR + errorEvent IndexNotFoundException, "no such index" + kind INTERNAL + hasNoParent() + } + span(1) { name "GetAction" status ERROR errorEvent IndexNotFoundException, "no such index" kind CLIENT + childOf(span(0)) attributes { "${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch" "${SemanticAttributes.DB_OPERATION.key}" "GetAction" @@ -112,6 +148,11 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification { "elasticsearch.request.indices" indexName } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } @@ -119,6 +160,9 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification { indexName = "invalid-index" indexType = "test-type" id = "1" + callKind | call + "sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) } + "async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) } } def "test elasticsearch get"() { diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/test/groovy/Elasticsearch5TransportClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/test/groovy/Elasticsearch5TransportClientTest.groovy index 986e0a2fe7..7fc1cb850e 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/test/groovy/Elasticsearch5TransportClientTest.groovy +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.0/javaagent/src/test/groovy/Elasticsearch5TransportClientTest.groovy @@ -4,12 +4,11 @@ */ import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.StatusCode.ERROR import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings @@ -23,8 +22,9 @@ import org.elasticsearch.transport.RemoteTransportException import org.elasticsearch.transport.TransportService import org.elasticsearch.transport.client.PreBuiltTransportClient import spock.lang.Shared +import spock.lang.Unroll -class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecification { +class Elasticsearch5TransportClientTest extends AbstractElasticsearchTransportClientTest { public static final long TIMEOUT = 10000 // 10 seconds @Shared @@ -80,18 +80,29 @@ class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecificatio } } - def "test elasticsearch status"() { - setup: - def result = client.admin().cluster().health(new ClusterHealthRequest()) + @Override + TransportClient client() { + client + } - def clusterHealthStatus = result.get().status + @Unroll + def "test elasticsearch status #callKind"() { + setup: + def clusterHealthStatus = runWithSpan("parent") { + call.call() + } expect: clusterHealthStatus.name() == "GREEN" assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { name "ClusterHealthAction" kind CLIENT attributes { @@ -104,24 +115,44 @@ class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecificatio "elasticsearch.request" "ClusterHealthRequest" } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } + + where: + callKind | call + "sync" | { clusterHealthSync() } + "async" | { clusterHealthAsync() } } - def "test elasticsearch error"() { + def "test elasticsearch error #callKind"() { when: - client.prepareGet(indexName, indexType, id).get() + runWithSpan("parent") { + call.call(indexName, indexType, id) + } then: thrown IndexNotFoundException and: assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + status ERROR + errorEvent IndexNotFoundException, "no such index" + kind INTERNAL + hasNoParent() + } + span(1) { name "GetAction" kind CLIENT status ERROR + childOf(span(0)) errorEvent RemoteTransportException, String attributes { "${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch" @@ -131,6 +162,11 @@ class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecificatio "elasticsearch.request.indices" indexName } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } @@ -138,6 +174,9 @@ class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecificatio indexName = "invalid-index" indexType = "test-type" id = "1" + callKind | call + "sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) } + "async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) } } def "test elasticsearch get"() { diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/build.gradle.kts b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/build.gradle.kts index 9110700ef4..8c417326e8 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/build.gradle.kts +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/build.gradle.kts @@ -38,6 +38,7 @@ dependencies { testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent")) testInstrumentation(project(":instrumentation:spring:spring-data-1.8:javaagent")) + testImplementation(project(":instrumentation:elasticsearch:elasticsearch-transport-testing")) testImplementation("org.apache.logging.log4j:log4j-core:2.11.0") testImplementation("org.apache.logging.log4j:log4j-api:2.11.0") diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_3/AbstractClientInstrumentation.java b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_3/AbstractClientInstrumentation.java index 0190dfb1d9..999c1c534b 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_3/AbstractClientInstrumentation.java +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_3/AbstractClientInstrumentation.java @@ -54,11 +54,13 @@ public class AbstractClientInstrumentation implements TypeInstrumentation { @Advice.Argument(value = 2, readOnly = false) ActionListener actionListener) { - context = tracer().startSpan(currentContext(), null, action); + Context parentContext = currentContext(); + context = tracer().startSpan(parentContext, null, action); scope = context.makeCurrent(); tracer().onRequest(context, action.getClass(), actionRequest.getClass()); - actionListener = new TransportActionListener<>(actionRequest, actionListener, context); + actionListener = + new TransportActionListener<>(actionRequest, actionListener, context, parentContext); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_3/TransportActionListener.java b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_3/TransportActionListener.java index f8398264d7..3eab540302 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_3/TransportActionListener.java +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v5_3/TransportActionListener.java @@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; @@ -34,11 +35,16 @@ public class TransportActionListener implements Action private final ActionListener listener; private final Context context; + private final Context parentContext; public TransportActionListener( - ActionRequest actionRequest, ActionListener listener, Context context) { + ActionRequest actionRequest, + ActionListener listener, + Context context, + Context parentContext) { this.listener = listener; this.context = context; + this.parentContext = parentContext; onRequest(actionRequest); } @@ -125,12 +131,16 @@ public class TransportActionListener implements Action } tracer().end(context); - listener.onResponse(response); + try (Scope ignored = parentContext.makeCurrent()) { + listener.onResponse(response); + } } @Override public void onFailure(Exception e) { tracer().endExceptionally(context, e); - listener.onFailure(e); + try (Scope ignored = parentContext.makeCurrent()) { + listener.onFailure(e); + } } } diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/test/groovy/Elasticsearch53NodeClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/test/groovy/Elasticsearch53NodeClientTest.groovy index 0d7ceec7cc..e50143e081 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/test/groovy/Elasticsearch53NodeClientTest.groovy +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/test/groovy/Elasticsearch53NodeClientTest.groovy @@ -4,13 +4,13 @@ */ import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.StatusCode.ERROR import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest +import org.elasticsearch.client.Client import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.env.Environment @@ -19,8 +19,9 @@ import org.elasticsearch.node.InternalSettingsPreparer import org.elasticsearch.node.Node import org.elasticsearch.transport.Netty3Plugin import spock.lang.Shared +import spock.lang.Unroll -class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification { +class Elasticsearch53NodeClientTest extends AbstractElasticsearchNodeClientTest { public static final long TIMEOUT = 10000 // 10 seconds @Shared @@ -30,7 +31,8 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification { @Shared String clusterName = UUID.randomUUID().toString() - def client = testNode.client() + @Shared + Client client def setupSpec() { @@ -49,12 +51,13 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification { .build() testNode = new Node(new Environment(InternalSettingsPreparer.prepareSettings(settings)), [Netty3Plugin]) testNode.start() + client = testNode.client() runWithSpan("setup") { // this may potentially create multiple requests and therefore multiple spans, so we wrap this call // into a top level trace to get exactly one trace in the result. - testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT) + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT) // disable periodic refresh in InternalClusterInfoService as it creates spans that tests don't expect - testNode.client().admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().transientSettings(["cluster.routing.allocation.disk.threshold_enabled": false])) + client.admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().transientSettings(["cluster.routing.allocation.disk.threshold_enabled": false])) } ignoreTracesAndClear(1) } @@ -67,20 +70,32 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification { } } - def "test elasticsearch status"() { - setup: - def result = client.admin().cluster().health(new ClusterHealthRequest()) + @Override + Client client() { + client + } - def clusterHealthStatus = result.get().status + @Unroll + def "test elasticsearch status #callKind"() { + setup: + def clusterHealthStatus = runWithSpan("parent") { + call.call() + } expect: clusterHealthStatus.name() == "GREEN" assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { name "ClusterHealthAction" kind CLIENT + childOf(span(0)) attributes { "${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch" "${SemanticAttributes.DB_OPERATION.key}" "ClusterHealthAction" @@ -88,24 +103,45 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification { "elasticsearch.request" "ClusterHealthRequest" } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } + + where: + callKind | call + "sync" | { clusterHealthSync() } + "async" | { clusterHealthAsync() } } - def "test elasticsearch error"() { + @Unroll + def "test elasticsearch error #callKind"() { when: - client.prepareGet(indexName, indexType, id).get() + runWithSpan("parent") { + call.call(indexName, indexType, id) + } then: thrown IndexNotFoundException and: assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + status ERROR + errorEvent IndexNotFoundException, "no such index" + kind INTERNAL + hasNoParent() + } + span(1) { name "GetAction" kind CLIENT status ERROR + childOf(span(0)) errorEvent IndexNotFoundException, "no such index" attributes { "${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch" @@ -115,6 +151,11 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification { "elasticsearch.request.indices" indexName } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } @@ -122,6 +163,9 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification { indexName = "invalid-index" indexType = "test-type" id = "1" + callKind | call + "sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) } + "async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) } } def "test elasticsearch get"() { diff --git a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/test/groovy/Elasticsearch53TransportClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/test/groovy/Elasticsearch53TransportClientTest.groovy index ae1738d331..f46ff2451d 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/test/groovy/Elasticsearch53TransportClientTest.groovy +++ b/instrumentation/elasticsearch/elasticsearch-transport-5.3/javaagent/src/test/groovy/Elasticsearch53TransportClientTest.groovy @@ -4,12 +4,11 @@ */ import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.StatusCode.ERROR import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.io.FileSystemUtils @@ -24,8 +23,9 @@ import org.elasticsearch.transport.RemoteTransportException import org.elasticsearch.transport.TransportService import org.elasticsearch.transport.client.PreBuiltTransportClient import spock.lang.Shared +import spock.lang.Unroll -class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecification { +class Elasticsearch53TransportClientTest extends AbstractElasticsearchTransportClientTest { public static final long TIMEOUT = 10000 // 10 seconds @Shared @@ -85,20 +85,32 @@ class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecificati } } - def "test elasticsearch status"() { - setup: - def result = client.admin().cluster().health(new ClusterHealthRequest()) + @Override + TransportClient client() { + client + } - def clusterHealthStatus = result.get().status + @Unroll + def "test elasticsearch status #callKind"() { + setup: + def clusterHealthStatus = runWithSpan("parent") { + call.call() + } expect: clusterHealthStatus.name() == "GREEN" assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { name "ClusterHealthAction" kind CLIENT + childOf(span(0)) attributes { "${SemanticAttributes.NET_PEER_NAME.key}" tcpPublishAddress.host == tcpPublishAddress.address ? null : tcpPublishAddress.address "${SemanticAttributes.NET_PEER_IP.key}" tcpPublishAddress.address @@ -109,21 +121,40 @@ class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecificati "elasticsearch.request" "ClusterHealthRequest" } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } + + where: + callKind | call + "sync" | { clusterHealthSync() } + "async" | { clusterHealthAsync() } } - def "test elasticsearch error"() { + def "test elasticsearch error #callKind"() { when: - client.prepareGet(indexName, indexType, id).get() + runWithSpan("parent") { + call.call(indexName, indexType, id) + } then: thrown IndexNotFoundException and: assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + status ERROR + errorEvent IndexNotFoundException, "no such index" + kind INTERNAL + hasNoParent() + } + span(1) { name "GetAction" kind CLIENT status ERROR @@ -136,6 +167,11 @@ class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecificati "elasticsearch.request.indices" indexName } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } @@ -143,6 +179,9 @@ class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecificati indexName = "invalid-index" indexType = "test-type" id = "1" + callKind | call + "sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) } + "async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) } } def "test elasticsearch get"() { diff --git a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/build.gradle.kts b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/build.gradle.kts index f4ce6c0080..42c178104e 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/build.gradle.kts +++ b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { testLibrary("org.elasticsearch.plugin:transport-netty4-client:6.0.0") + testImplementation(project(":instrumentation:elasticsearch:elasticsearch-transport-testing")) testImplementation("org.apache.logging.log4j:log4j-core:2.11.0") testImplementation("org.apache.logging.log4j:log4j-api:2.11.0") } diff --git a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v6_0/AbstractClientInstrumentation.java b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v6_0/AbstractClientInstrumentation.java index 84d6dcfd51..48b74d1370 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v6_0/AbstractClientInstrumentation.java +++ b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v6_0/AbstractClientInstrumentation.java @@ -58,10 +58,12 @@ public class AbstractClientInstrumentation implements TypeInstrumentation { @Advice.Argument(value = 2, readOnly = false) ActionListener actionListener) { - context = tracer().startSpan(currentContext(), null, action); + Context parentContext = currentContext(); + context = tracer().startSpan(parentContext, null, action); scope = context.makeCurrent(); tracer().onRequest(context, action.getClass(), actionRequest.getClass()); - actionListener = new TransportActionListener<>(actionRequest, actionListener, context); + actionListener = + new TransportActionListener<>(actionRequest, actionListener, context, parentContext); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) diff --git a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v6_0/TransportActionListener.java b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v6_0/TransportActionListener.java index e9e46b77a6..a6aace3774 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v6_0/TransportActionListener.java +++ b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/elasticsearch/transport/v6_0/TransportActionListener.java @@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import io.opentelemetry.instrumentation.api.config.Config; import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; @@ -38,11 +39,16 @@ public class TransportActionListener implements Action private final ActionListener listener; private final Context context; + private final Context parentContext; public TransportActionListener( - ActionRequest actionRequest, ActionListener listener, Context context) { + ActionRequest actionRequest, + ActionListener listener, + Context context, + Context parentContext) { this.listener = listener; this.context = context; + this.parentContext = parentContext; onRequest(actionRequest); } @@ -130,12 +136,16 @@ public class TransportActionListener implements Action } tracer().end(context); - listener.onResponse(response); + try (Scope ignored = parentContext.makeCurrent()) { + listener.onResponse(response); + } } @Override public void onFailure(Exception e) { tracer().endExceptionally(context, e); - listener.onFailure(e); + try (Scope ignored = parentContext.makeCurrent()) { + listener.onFailure(e); + } } } diff --git a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/test/groovy/Elasticsearch6NodeClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/test/groovy/Elasticsearch6NodeClientTest.groovy index 3fefd83b51..f5ae7ef221 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/test/groovy/Elasticsearch6NodeClientTest.groovy +++ b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/test/groovy/Elasticsearch6NodeClientTest.groovy @@ -4,20 +4,21 @@ */ import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.StatusCode.ERROR import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest +import org.elasticsearch.client.Client import org.elasticsearch.common.io.FileSystemUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.index.IndexNotFoundException import org.elasticsearch.node.Node import spock.lang.Shared +import spock.lang.Unroll -class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification { +class Elasticsearch6NodeClientTest extends AbstractElasticsearchNodeClientTest { public static final long TIMEOUT = 10000 // 10 seconds @Shared @@ -27,7 +28,8 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification { @Shared String clusterName = UUID.randomUUID().toString() - def client = testNode.client() + @Shared + Client client def setupSpec() { @@ -44,12 +46,13 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification { .build() testNode = NodeFactory.newNode(settings) testNode.start() + client = testNode.client() runWithSpan("setup") { // this may potentially create multiple requests and therefore multiple spans, so we wrap this call // into a top level trace to get exactly one trace in the result. - testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT) + client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT) // disable periodic refresh in InternalClusterInfoService as it creates spans that tests don't expect - testNode.client().admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().transientSettings(["cluster.routing.allocation.disk.threshold_enabled": false])) + client.admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().transientSettings(["cluster.routing.allocation.disk.threshold_enabled": false])) } waitForTraces(1) } @@ -62,20 +65,32 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification { } } - def "test elasticsearch status"() { - setup: - def result = client.admin().cluster().health(new ClusterHealthRequest()).get() + @Override + Client client() { + client + } - def clusterHealthStatus = result.status + @Unroll + def "test elasticsearch status #callKind"() { + setup: + def clusterHealthStatus = runWithSpan("parent") { + call.call() + } expect: clusterHealthStatus.name() == "GREEN" assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { name "ClusterHealthAction" kind CLIENT + childOf(span(0)) attributes { "${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch" "${SemanticAttributes.DB_OPERATION.key}" "ClusterHealthAction" @@ -83,24 +98,44 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification { "elasticsearch.request" "ClusterHealthRequest" } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } + + where: + callKind | call + "sync" | { clusterHealthSync() } + "async" | { clusterHealthAsync() } } - def "test elasticsearch error"() { + def "test elasticsearch error #callKind"() { when: - client.prepareGet(indexName, indexType, id).get() + runWithSpan("parent") { + call.call(indexName, indexType, id) + } then: thrown IndexNotFoundException and: assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + status ERROR + errorEvent IndexNotFoundException, ~/no such index( \[invalid-index])?/ + kind INTERNAL + hasNoParent() + } + span(1) { name "GetAction" kind CLIENT status ERROR + childOf(span(0)) errorEvent IndexNotFoundException, ~/no such index( \[invalid-index])?/ attributes { "${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch" @@ -110,6 +145,11 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification { "elasticsearch.request.indices" indexName } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } @@ -117,6 +157,9 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification { indexName = "invalid-index" indexType = "test-type" id = "1" + callKind | call + "sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) } + "async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) } } def "test elasticsearch get"() { diff --git a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/test/groovy/Elasticsearch6TransportClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/test/groovy/Elasticsearch6TransportClientTest.groovy index 1f22537126..4ce6db2231 100644 --- a/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/test/groovy/Elasticsearch6TransportClientTest.groovy +++ b/instrumentation/elasticsearch/elasticsearch-transport-6.0/javaagent/src/test/groovy/Elasticsearch6TransportClientTest.groovy @@ -4,12 +4,11 @@ */ import static io.opentelemetry.api.trace.SpanKind.CLIENT +import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.StatusCode.ERROR import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.io.FileSystemUtils @@ -21,8 +20,9 @@ import org.elasticsearch.transport.RemoteTransportException import org.elasticsearch.transport.TransportService import org.elasticsearch.transport.client.PreBuiltTransportClient import spock.lang.Shared +import spock.lang.Unroll -class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecification { +class Elasticsearch6TransportClientTest extends AbstractElasticsearchTransportClientTest { public static final long TIMEOUT = 10000 // 10 seconds @Shared @@ -78,20 +78,32 @@ class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecificatio } } - def "test elasticsearch status"() { - setup: - def result = client.admin().cluster().health(new ClusterHealthRequest()) + @Override + TransportClient client() { + client + } - def clusterHealthStatus = result.get().status + @Unroll + def "test elasticsearch status #callKind"() { + setup: + def clusterHealthStatus = runWithSpan("parent") { + call.call() + } expect: clusterHealthStatus.name() == "GREEN" assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { name "ClusterHealthAction" kind CLIENT + childOf(span(0)) attributes { "${SemanticAttributes.NET_PEER_NAME.key}" "localhost" "${SemanticAttributes.NET_PEER_IP.key}" tcpPublishAddress.address @@ -102,21 +114,40 @@ class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecificatio "elasticsearch.request" "ClusterHealthRequest" } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } + + where: + callKind | call + "sync" | { clusterHealthSync() } + "async" | { clusterHealthAsync() } } - def "test elasticsearch error"() { + def "test elasticsearch error #callKind"() { when: - client.prepareGet(indexName, indexType, id).get() + runWithSpan("parent") { + call.call(indexName, indexType, id) + } then: thrown IndexNotFoundException and: assertTraces(1) { - trace(0, 1) { + trace(0, 3) { span(0) { + name "parent" + status ERROR + errorEvent IndexNotFoundException, "no such index" + kind INTERNAL + hasNoParent() + } + span(1) { name "GetAction" kind CLIENT status ERROR @@ -129,6 +160,11 @@ class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecificatio "elasticsearch.request.indices" indexName } } + span(2) { + name "callback" + kind INTERNAL + childOf(span(0)) + } } } @@ -136,6 +172,9 @@ class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecificatio indexName = "invalid-index" indexType = "test-type" id = "1" + callKind | call + "sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) } + "async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) } } def "test elasticsearch get"() { diff --git a/instrumentation/elasticsearch/elasticsearch-transport-testing/build.gradle.kts b/instrumentation/elasticsearch/elasticsearch-transport-testing/build.gradle.kts new file mode 100644 index 0000000000..ab67f625ef --- /dev/null +++ b/instrumentation/elasticsearch/elasticsearch-transport-testing/build.gradle.kts @@ -0,0 +1,10 @@ +plugins { + id("otel.java-conventions") +} + +dependencies { + compileOnly("org.elasticsearch.client:transport:5.3.0") + compileOnly("org.elasticsearch:elasticsearch:5.3.0") + + implementation(project(":testing-common")) +} \ No newline at end of file diff --git a/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchClientTest.groovy new file mode 100644 index 0000000000..9a2037b746 --- /dev/null +++ b/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchClientTest.groovy @@ -0,0 +1,65 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification +import io.opentelemetry.instrumentation.test.InstrumentationSpecification +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import org.elasticsearch.action.ActionListener +import org.elasticsearch.transport.RemoteTransportException + +abstract class AbstractElasticsearchClientTest extends AgentInstrumentationSpecification { + + static class Result { + CountDownLatch latch = new CountDownLatch(1) + RESPONSE response + Exception failure + + void setResponse(RESPONSE response) { + this.response = response + latch.countDown() + } + + void setFailure(Exception failure) { + this.failure = failure + latch.countDown() + } + + RESPONSE get() { + latch.await(1, TimeUnit.MINUTES) + if (response != null) { + return response + } + throw failure + } + } + + static class ResultListener implements ActionListener { + final Result result + final InstrumentationSpecification spec + + ResultListener(InstrumentationSpecification spec, Result result) { + this.spec = spec + this.result = result + } + + @Override + void onResponse(T response) { + spec.runWithSpan("callback") { + result.setResponse(response) + } + } + + @Override + void onFailure(Exception e) { + if (e instanceof RemoteTransportException) { + e = e.getCause() + } + spec.runWithSpan("callback") { + result.setFailure(e) + } + } + } +} diff --git a/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchNodeClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchNodeClientTest.groovy new file mode 100644 index 0000000000..cdcac0f844 --- /dev/null +++ b/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchNodeClientTest.groovy @@ -0,0 +1,42 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse +import org.elasticsearch.action.get.GetResponse +import org.elasticsearch.client.Client +import org.elasticsearch.cluster.health.ClusterHealthStatus + +abstract class AbstractElasticsearchNodeClientTest extends AbstractElasticsearchClientTest { + + abstract Client client() + + ClusterHealthStatus clusterHealthSync() { + def result = client().admin().cluster().health(new ClusterHealthRequest()) + return runWithSpan("callback") { + result.get().status + } + } + + ClusterHealthStatus clusterHealthAsync() { + def result = new Result() + client().admin().cluster().health(new ClusterHealthRequest(), new ResultListener(this, result)) + return result.get().status + } + + def prepareGetSync(indexName, indexType, id) { + try { + client().prepareGet(indexName, indexType, id).get() + } finally { + runWithSpan("callback") {} + } + } + + def prepareGetAsync(indexName, indexType, id) { + def result = new Result() + client().prepareGet(indexName, indexType, id).execute(new ResultListener(this, result)) + result.get() + } +} diff --git a/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchTransportClientTest.groovy b/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchTransportClientTest.groovy new file mode 100644 index 0000000000..0f6c9fe3fe --- /dev/null +++ b/instrumentation/elasticsearch/elasticsearch-transport-testing/src/main/groovy/AbstractElasticsearchTransportClientTest.groovy @@ -0,0 +1,42 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse +import org.elasticsearch.action.get.GetResponse +import org.elasticsearch.client.transport.TransportClient +import org.elasticsearch.cluster.health.ClusterHealthStatus + +abstract class AbstractElasticsearchTransportClientTest extends AbstractElasticsearchClientTest { + + abstract TransportClient client() + + ClusterHealthStatus clusterHealthSync() { + def result = client().admin().cluster().health(new ClusterHealthRequest()) + return runWithSpan("callback") { + result.get().status + } + } + + ClusterHealthStatus clusterHealthAsync() { + def result = new Result() + client().admin().cluster().health(new ClusterHealthRequest(), new ResultListener(this, result)) + return result.get().status + } + + def prepareGetSync(indexName, indexType, id) { + try { + client().prepareGet(indexName, indexType, id).get() + } finally { + runWithSpan("callback") {} + } + } + + def prepareGetAsync(indexName, indexType, id) { + def result = new Result() + client().prepareGet(indexName, indexType, id).execute(new ResultListener(this, result)) + result.get() + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index d6e5753830..c948f901df 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -135,6 +135,7 @@ include(":instrumentation:elasticsearch:elasticsearch-rest-5.0:javaagent") include(":instrumentation:elasticsearch:elasticsearch-rest-6.4:javaagent") include(":instrumentation:elasticsearch:elasticsearch-rest-7.0:javaagent") include(":instrumentation:elasticsearch:elasticsearch-transport-common:library") +include(":instrumentation:elasticsearch:elasticsearch-transport-testing") include(":instrumentation:elasticsearch:elasticsearch-transport-5.0:javaagent") include(":instrumentation:elasticsearch:elasticsearch-transport-5.3:javaagent") include(":instrumentation:elasticsearch:elasticsearch-transport-6.0:javaagent")