Context propagation to elasticsearch-transport callbacks (#3861)

This commit is contained in:
Lauri Tulmin 2021-08-19 12:59:47 +03:00 committed by GitHub
parent f05c3788e1
commit 514553eac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 536 additions and 89 deletions

View File

@ -33,6 +33,7 @@ dependencies {
testInstrumentation(project(":instrumentation:apache-httpasyncclient-4.1:javaagent"))
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
testImplementation(project(":instrumentation:elasticsearch:elasticsearch-transport-testing"))
testImplementation("org.apache.logging.log4j:log4j-core:2.11.0")
testImplementation("org.apache.logging.log4j:log4j-api:2.11.0")

View File

@ -54,11 +54,13 @@ public class AbstractClientInstrumentation implements TypeInstrumentation {
@Advice.Argument(value = 2, readOnly = false)
ActionListener<ActionResponse> actionListener) {
context = tracer().startSpan(currentContext(), null, action);
Context parentContext = currentContext();
context = tracer().startSpan(parentContext, null, action);
scope = context.makeCurrent();
tracer().onRequest(context, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, context);
actionListener =
new TransportActionListener<>(actionRequest, actionListener, context, parentContext);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
@ -34,11 +35,16 @@ public class TransportActionListener<T extends ActionResponse> implements Action
private final ActionListener<T> listener;
private final Context context;
private final Context parentContext;
public TransportActionListener(
ActionRequest<?> actionRequest, ActionListener<T> listener, Context context) {
ActionRequest<?> actionRequest,
ActionListener<T> listener,
Context context,
Context parentContext) {
this.listener = listener;
this.context = context;
this.parentContext = parentContext;
onRequest(actionRequest);
}
@ -124,12 +130,16 @@ public class TransportActionListener<T extends ActionResponse> implements Action
}
tracer().end(context);
listener.onResponse(response);
try (Scope ignored = parentContext.makeCurrent()) {
listener.onResponse(response);
}
}
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(context, e);
listener.onFailure(e);
try (Scope ignored = parentContext.makeCurrent()) {
listener.onFailure(e);
}
}
}

View File

@ -4,12 +4,12 @@
*/
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.client.Client
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.env.Environment
@ -18,8 +18,9 @@ import org.elasticsearch.node.Node
import org.elasticsearch.node.internal.InternalSettingsPreparer
import org.elasticsearch.transport.Netty3Plugin
import spock.lang.Shared
import spock.lang.Unroll
class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification {
class Elasticsearch5NodeClientTest extends AbstractElasticsearchNodeClientTest {
public static final long TIMEOUT = 10000 // 10 seconds
@Shared
@ -29,7 +30,8 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification {
@Shared
String clusterName = UUID.randomUUID().toString()
def client = testNode.client()
@Shared
Client client
def setupSpec() {
@ -48,10 +50,11 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification {
.build()
testNode = new Node(new Environment(InternalSettingsPreparer.prepareSettings(settings)), [Netty3Plugin])
testNode.start()
client = testNode.client()
runWithSpan("setup") {
// this may potentially create multiple requests and therefore multiple spans, so we wrap this call
// into a top level trace to get exactly one trace in the result.
testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT)
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT)
}
ignoreTracesAndClear(1)
}
@ -64,20 +67,32 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification {
}
}
def "test elasticsearch status"() {
setup:
def result = client.admin().cluster().health(new ClusterHealthRequest())
@Override
Client client() {
client
}
def clusterHealthStatus = result.get().status
@Unroll
def "test elasticsearch status #callKind"() {
setup:
def clusterHealthStatus = runWithSpan("parent") {
call.call()
}
expect:
clusterHealthStatus.name() == "GREEN"
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "ClusterHealthAction"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
"${SemanticAttributes.DB_OPERATION.key}" "ClusterHealthAction"
@ -85,25 +100,46 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification {
"elasticsearch.request" "ClusterHealthRequest"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
where:
callKind | call
"sync" | { clusterHealthSync() }
"async" | { clusterHealthAsync() }
}
def "test elasticsearch error"() {
@Unroll
def "test elasticsearch error #callKind"() {
when:
client.prepareGet(indexName, indexType, id).get()
runWithSpan("parent") {
call.call(indexName, indexType, id)
}
then:
thrown IndexNotFoundException
and:
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
status ERROR
errorEvent IndexNotFoundException, "no such index"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GetAction"
status ERROR
errorEvent IndexNotFoundException, "no such index"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
"${SemanticAttributes.DB_OPERATION.key}" "GetAction"
@ -112,6 +148,11 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification {
"elasticsearch.request.indices" indexName
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
@ -119,6 +160,9 @@ class Elasticsearch5NodeClientTest extends AgentInstrumentationSpecification {
indexName = "invalid-index"
indexType = "test-type"
id = "1"
callKind | call
"sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) }
"async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) }
}
def "test elasticsearch get"() {

View File

@ -4,12 +4,11 @@
*/
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
@ -23,8 +22,9 @@ import org.elasticsearch.transport.RemoteTransportException
import org.elasticsearch.transport.TransportService
import org.elasticsearch.transport.client.PreBuiltTransportClient
import spock.lang.Shared
import spock.lang.Unroll
class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecification {
class Elasticsearch5TransportClientTest extends AbstractElasticsearchTransportClientTest {
public static final long TIMEOUT = 10000 // 10 seconds
@Shared
@ -80,18 +80,29 @@ class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecificatio
}
}
def "test elasticsearch status"() {
setup:
def result = client.admin().cluster().health(new ClusterHealthRequest())
@Override
TransportClient client() {
client
}
def clusterHealthStatus = result.get().status
@Unroll
def "test elasticsearch status #callKind"() {
setup:
def clusterHealthStatus = runWithSpan("parent") {
call.call()
}
expect:
clusterHealthStatus.name() == "GREEN"
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "ClusterHealthAction"
kind CLIENT
attributes {
@ -104,24 +115,44 @@ class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecificatio
"elasticsearch.request" "ClusterHealthRequest"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
where:
callKind | call
"sync" | { clusterHealthSync() }
"async" | { clusterHealthAsync() }
}
def "test elasticsearch error"() {
def "test elasticsearch error #callKind"() {
when:
client.prepareGet(indexName, indexType, id).get()
runWithSpan("parent") {
call.call(indexName, indexType, id)
}
then:
thrown IndexNotFoundException
and:
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
status ERROR
errorEvent IndexNotFoundException, "no such index"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GetAction"
kind CLIENT
status ERROR
childOf(span(0))
errorEvent RemoteTransportException, String
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
@ -131,6 +162,11 @@ class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecificatio
"elasticsearch.request.indices" indexName
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
@ -138,6 +174,9 @@ class Elasticsearch5TransportClientTest extends AgentInstrumentationSpecificatio
indexName = "invalid-index"
indexType = "test-type"
id = "1"
callKind | call
"sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) }
"async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) }
}
def "test elasticsearch get"() {

View File

@ -38,6 +38,7 @@ dependencies {
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
testInstrumentation(project(":instrumentation:spring:spring-data-1.8:javaagent"))
testImplementation(project(":instrumentation:elasticsearch:elasticsearch-transport-testing"))
testImplementation("org.apache.logging.log4j:log4j-core:2.11.0")
testImplementation("org.apache.logging.log4j:log4j-api:2.11.0")

View File

@ -54,11 +54,13 @@ public class AbstractClientInstrumentation implements TypeInstrumentation {
@Advice.Argument(value = 2, readOnly = false)
ActionListener<ActionResponse> actionListener) {
context = tracer().startSpan(currentContext(), null, action);
Context parentContext = currentContext();
context = tracer().startSpan(parentContext, null, action);
scope = context.makeCurrent();
tracer().onRequest(context, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, context);
actionListener =
new TransportActionListener<>(actionRequest, actionListener, context, parentContext);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
@ -34,11 +35,16 @@ public class TransportActionListener<T extends ActionResponse> implements Action
private final ActionListener<T> listener;
private final Context context;
private final Context parentContext;
public TransportActionListener(
ActionRequest actionRequest, ActionListener<T> listener, Context context) {
ActionRequest actionRequest,
ActionListener<T> listener,
Context context,
Context parentContext) {
this.listener = listener;
this.context = context;
this.parentContext = parentContext;
onRequest(actionRequest);
}
@ -125,12 +131,16 @@ public class TransportActionListener<T extends ActionResponse> implements Action
}
tracer().end(context);
listener.onResponse(response);
try (Scope ignored = parentContext.makeCurrent()) {
listener.onResponse(response);
}
}
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(context, e);
listener.onFailure(e);
try (Scope ignored = parentContext.makeCurrent()) {
listener.onFailure(e);
}
}
}

View File

@ -4,13 +4,13 @@
*/
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.elasticsearch.client.Client
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.env.Environment
@ -19,8 +19,9 @@ import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty3Plugin
import spock.lang.Shared
import spock.lang.Unroll
class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification {
class Elasticsearch53NodeClientTest extends AbstractElasticsearchNodeClientTest {
public static final long TIMEOUT = 10000 // 10 seconds
@Shared
@ -30,7 +31,8 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification {
@Shared
String clusterName = UUID.randomUUID().toString()
def client = testNode.client()
@Shared
Client client
def setupSpec() {
@ -49,12 +51,13 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification {
.build()
testNode = new Node(new Environment(InternalSettingsPreparer.prepareSettings(settings)), [Netty3Plugin])
testNode.start()
client = testNode.client()
runWithSpan("setup") {
// this may potentially create multiple requests and therefore multiple spans, so we wrap this call
// into a top level trace to get exactly one trace in the result.
testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT)
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT)
// disable periodic refresh in InternalClusterInfoService as it creates spans that tests don't expect
testNode.client().admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().transientSettings(["cluster.routing.allocation.disk.threshold_enabled": false]))
client.admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().transientSettings(["cluster.routing.allocation.disk.threshold_enabled": false]))
}
ignoreTracesAndClear(1)
}
@ -67,20 +70,32 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification {
}
}
def "test elasticsearch status"() {
setup:
def result = client.admin().cluster().health(new ClusterHealthRequest())
@Override
Client client() {
client
}
def clusterHealthStatus = result.get().status
@Unroll
def "test elasticsearch status #callKind"() {
setup:
def clusterHealthStatus = runWithSpan("parent") {
call.call()
}
expect:
clusterHealthStatus.name() == "GREEN"
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "ClusterHealthAction"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
"${SemanticAttributes.DB_OPERATION.key}" "ClusterHealthAction"
@ -88,24 +103,45 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification {
"elasticsearch.request" "ClusterHealthRequest"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
where:
callKind | call
"sync" | { clusterHealthSync() }
"async" | { clusterHealthAsync() }
}
def "test elasticsearch error"() {
@Unroll
def "test elasticsearch error #callKind"() {
when:
client.prepareGet(indexName, indexType, id).get()
runWithSpan("parent") {
call.call(indexName, indexType, id)
}
then:
thrown IndexNotFoundException
and:
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
status ERROR
errorEvent IndexNotFoundException, "no such index"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GetAction"
kind CLIENT
status ERROR
childOf(span(0))
errorEvent IndexNotFoundException, "no such index"
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
@ -115,6 +151,11 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification {
"elasticsearch.request.indices" indexName
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
@ -122,6 +163,9 @@ class Elasticsearch53NodeClientTest extends AgentInstrumentationSpecification {
indexName = "invalid-index"
indexType = "test-type"
id = "1"
callKind | call
"sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) }
"async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) }
}
def "test elasticsearch get"() {

View File

@ -4,12 +4,11 @@
*/
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.io.FileSystemUtils
@ -24,8 +23,9 @@ import org.elasticsearch.transport.RemoteTransportException
import org.elasticsearch.transport.TransportService
import org.elasticsearch.transport.client.PreBuiltTransportClient
import spock.lang.Shared
import spock.lang.Unroll
class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecification {
class Elasticsearch53TransportClientTest extends AbstractElasticsearchTransportClientTest {
public static final long TIMEOUT = 10000 // 10 seconds
@Shared
@ -85,20 +85,32 @@ class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecificati
}
}
def "test elasticsearch status"() {
setup:
def result = client.admin().cluster().health(new ClusterHealthRequest())
@Override
TransportClient client() {
client
}
def clusterHealthStatus = result.get().status
@Unroll
def "test elasticsearch status #callKind"() {
setup:
def clusterHealthStatus = runWithSpan("parent") {
call.call()
}
expect:
clusterHealthStatus.name() == "GREEN"
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "ClusterHealthAction"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.NET_PEER_NAME.key}" tcpPublishAddress.host == tcpPublishAddress.address ? null : tcpPublishAddress.address
"${SemanticAttributes.NET_PEER_IP.key}" tcpPublishAddress.address
@ -109,21 +121,40 @@ class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecificati
"elasticsearch.request" "ClusterHealthRequest"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
where:
callKind | call
"sync" | { clusterHealthSync() }
"async" | { clusterHealthAsync() }
}
def "test elasticsearch error"() {
def "test elasticsearch error #callKind"() {
when:
client.prepareGet(indexName, indexType, id).get()
runWithSpan("parent") {
call.call(indexName, indexType, id)
}
then:
thrown IndexNotFoundException
and:
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
status ERROR
errorEvent IndexNotFoundException, "no such index"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GetAction"
kind CLIENT
status ERROR
@ -136,6 +167,11 @@ class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecificati
"elasticsearch.request.indices" indexName
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
@ -143,6 +179,9 @@ class Elasticsearch53TransportClientTest extends AgentInstrumentationSpecificati
indexName = "invalid-index"
indexType = "test-type"
id = "1"
callKind | call
"sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) }
"async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) }
}
def "test elasticsearch get"() {

View File

@ -35,6 +35,7 @@ dependencies {
testLibrary("org.elasticsearch.plugin:transport-netty4-client:6.0.0")
testImplementation(project(":instrumentation:elasticsearch:elasticsearch-transport-testing"))
testImplementation("org.apache.logging.log4j:log4j-core:2.11.0")
testImplementation("org.apache.logging.log4j:log4j-api:2.11.0")
}

View File

@ -58,10 +58,12 @@ public class AbstractClientInstrumentation implements TypeInstrumentation {
@Advice.Argument(value = 2, readOnly = false)
ActionListener<ActionResponse> actionListener) {
context = tracer().startSpan(currentContext(), null, action);
Context parentContext = currentContext();
context = tracer().startSpan(parentContext, null, action);
scope = context.makeCurrent();
tracer().onRequest(context, action.getClass(), actionRequest.getClass());
actionListener = new TransportActionListener<>(actionRequest, actionListener, context);
actionListener =
new TransportActionListener<>(actionRequest, actionListener, context, parentContext);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.instrumentation.elasticsearch.transport
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.config.Config;
import io.opentelemetry.instrumentation.api.tracer.net.NetPeerAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
@ -38,11 +39,16 @@ public class TransportActionListener<T extends ActionResponse> implements Action
private final ActionListener<T> listener;
private final Context context;
private final Context parentContext;
public TransportActionListener(
ActionRequest actionRequest, ActionListener<T> listener, Context context) {
ActionRequest actionRequest,
ActionListener<T> listener,
Context context,
Context parentContext) {
this.listener = listener;
this.context = context;
this.parentContext = parentContext;
onRequest(actionRequest);
}
@ -130,12 +136,16 @@ public class TransportActionListener<T extends ActionResponse> implements Action
}
tracer().end(context);
listener.onResponse(response);
try (Scope ignored = parentContext.makeCurrent()) {
listener.onResponse(response);
}
}
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(context, e);
listener.onFailure(e);
try (Scope ignored = parentContext.makeCurrent()) {
listener.onFailure(e);
}
}
}

View File

@ -4,20 +4,21 @@
*/
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.elasticsearch.client.Client
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.node.Node
import spock.lang.Shared
import spock.lang.Unroll
class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification {
class Elasticsearch6NodeClientTest extends AbstractElasticsearchNodeClientTest {
public static final long TIMEOUT = 10000 // 10 seconds
@Shared
@ -27,7 +28,8 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification {
@Shared
String clusterName = UUID.randomUUID().toString()
def client = testNode.client()
@Shared
Client client
def setupSpec() {
@ -44,12 +46,13 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification {
.build()
testNode = NodeFactory.newNode(settings)
testNode.start()
client = testNode.client()
runWithSpan("setup") {
// this may potentially create multiple requests and therefore multiple spans, so we wrap this call
// into a top level trace to get exactly one trace in the result.
testNode.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT)
client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet(TIMEOUT)
// disable periodic refresh in InternalClusterInfoService as it creates spans that tests don't expect
testNode.client().admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().transientSettings(["cluster.routing.allocation.disk.threshold_enabled": false]))
client.admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().transientSettings(["cluster.routing.allocation.disk.threshold_enabled": false]))
}
waitForTraces(1)
}
@ -62,20 +65,32 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification {
}
}
def "test elasticsearch status"() {
setup:
def result = client.admin().cluster().health(new ClusterHealthRequest()).get()
@Override
Client client() {
client
}
def clusterHealthStatus = result.status
@Unroll
def "test elasticsearch status #callKind"() {
setup:
def clusterHealthStatus = runWithSpan("parent") {
call.call()
}
expect:
clusterHealthStatus.name() == "GREEN"
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "ClusterHealthAction"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
"${SemanticAttributes.DB_OPERATION.key}" "ClusterHealthAction"
@ -83,24 +98,44 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification {
"elasticsearch.request" "ClusterHealthRequest"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
where:
callKind | call
"sync" | { clusterHealthSync() }
"async" | { clusterHealthAsync() }
}
def "test elasticsearch error"() {
def "test elasticsearch error #callKind"() {
when:
client.prepareGet(indexName, indexType, id).get()
runWithSpan("parent") {
call.call(indexName, indexType, id)
}
then:
thrown IndexNotFoundException
and:
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
status ERROR
errorEvent IndexNotFoundException, ~/no such index( \[invalid-index])?/
kind INTERNAL
hasNoParent()
}
span(1) {
name "GetAction"
kind CLIENT
status ERROR
childOf(span(0))
errorEvent IndexNotFoundException, ~/no such index( \[invalid-index])?/
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
@ -110,6 +145,11 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification {
"elasticsearch.request.indices" indexName
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
@ -117,6 +157,9 @@ class Elasticsearch6NodeClientTest extends AgentInstrumentationSpecification {
indexName = "invalid-index"
indexType = "test-type"
id = "1"
callKind | call
"sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) }
"async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) }
}
def "test elasticsearch get"() {

View File

@ -4,12 +4,11 @@
*/
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.io.FileSystemUtils
@ -21,8 +20,9 @@ import org.elasticsearch.transport.RemoteTransportException
import org.elasticsearch.transport.TransportService
import org.elasticsearch.transport.client.PreBuiltTransportClient
import spock.lang.Shared
import spock.lang.Unroll
class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecification {
class Elasticsearch6TransportClientTest extends AbstractElasticsearchTransportClientTest {
public static final long TIMEOUT = 10000 // 10 seconds
@Shared
@ -78,20 +78,32 @@ class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecificatio
}
}
def "test elasticsearch status"() {
setup:
def result = client.admin().cluster().health(new ClusterHealthRequest())
@Override
TransportClient client() {
client
}
def clusterHealthStatus = result.get().status
@Unroll
def "test elasticsearch status #callKind"() {
setup:
def clusterHealthStatus = runWithSpan("parent") {
call.call()
}
expect:
clusterHealthStatus.name() == "GREEN"
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "ClusterHealthAction"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_IP.key}" tcpPublishAddress.address
@ -102,21 +114,40 @@ class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecificatio
"elasticsearch.request" "ClusterHealthRequest"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
where:
callKind | call
"sync" | { clusterHealthSync() }
"async" | { clusterHealthAsync() }
}
def "test elasticsearch error"() {
def "test elasticsearch error #callKind"() {
when:
client.prepareGet(indexName, indexType, id).get()
runWithSpan("parent") {
call.call(indexName, indexType, id)
}
then:
thrown IndexNotFoundException
and:
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
status ERROR
errorEvent IndexNotFoundException, "no such index"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GetAction"
kind CLIENT
status ERROR
@ -129,6 +160,11 @@ class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecificatio
"elasticsearch.request.indices" indexName
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
@ -136,6 +172,9 @@ class Elasticsearch6TransportClientTest extends AgentInstrumentationSpecificatio
indexName = "invalid-index"
indexType = "test-type"
id = "1"
callKind | call
"sync" | { indexName, indexType, id -> prepareGetSync(indexName, indexType, id) }
"async" | { indexName, indexType, id -> prepareGetAsync(indexName, indexType, id) }
}
def "test elasticsearch get"() {

View File

@ -0,0 +1,10 @@
plugins {
id("otel.java-conventions")
}
dependencies {
compileOnly("org.elasticsearch.client:transport:5.3.0")
compileOnly("org.elasticsearch:elasticsearch:5.3.0")
implementation(project(":testing-common"))
}

View File

@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import org.elasticsearch.action.ActionListener
import org.elasticsearch.transport.RemoteTransportException
abstract class AbstractElasticsearchClientTest extends AgentInstrumentationSpecification {
static class Result<RESPONSE> {
CountDownLatch latch = new CountDownLatch(1)
RESPONSE response
Exception failure
void setResponse(RESPONSE response) {
this.response = response
latch.countDown()
}
void setFailure(Exception failure) {
this.failure = failure
latch.countDown()
}
RESPONSE get() {
latch.await(1, TimeUnit.MINUTES)
if (response != null) {
return response
}
throw failure
}
}
static class ResultListener<T> implements ActionListener<T> {
final Result<T> result
final InstrumentationSpecification spec
ResultListener(InstrumentationSpecification spec, Result<T> result) {
this.spec = spec
this.result = result
}
@Override
void onResponse(T response) {
spec.runWithSpan("callback") {
result.setResponse(response)
}
}
@Override
void onFailure(Exception e) {
if (e instanceof RemoteTransportException) {
e = e.getCause()
}
spec.runWithSpan("callback") {
result.setFailure(e)
}
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.health.ClusterHealthStatus
abstract class AbstractElasticsearchNodeClientTest extends AbstractElasticsearchClientTest {
abstract Client client()
ClusterHealthStatus clusterHealthSync() {
def result = client().admin().cluster().health(new ClusterHealthRequest())
return runWithSpan("callback") {
result.get().status
}
}
ClusterHealthStatus clusterHealthAsync() {
def result = new Result<ClusterHealthResponse>()
client().admin().cluster().health(new ClusterHealthRequest(), new ResultListener<ClusterHealthResponse>(this, result))
return result.get().status
}
def prepareGetSync(indexName, indexType, id) {
try {
client().prepareGet(indexName, indexType, id).get()
} finally {
runWithSpan("callback") {}
}
}
def prepareGetAsync(indexName, indexType, id) {
def result = new Result<GetResponse>()
client().prepareGet(indexName, indexType, id).execute(new ResultListener<GetResponse>(this, result))
result.get()
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.cluster.health.ClusterHealthStatus
abstract class AbstractElasticsearchTransportClientTest extends AbstractElasticsearchClientTest {
abstract TransportClient client()
ClusterHealthStatus clusterHealthSync() {
def result = client().admin().cluster().health(new ClusterHealthRequest())
return runWithSpan("callback") {
result.get().status
}
}
ClusterHealthStatus clusterHealthAsync() {
def result = new Result<ClusterHealthResponse>()
client().admin().cluster().health(new ClusterHealthRequest(), new ResultListener<ClusterHealthResponse>(this, result))
return result.get().status
}
def prepareGetSync(indexName, indexType, id) {
try {
client().prepareGet(indexName, indexType, id).get()
} finally {
runWithSpan("callback") {}
}
}
def prepareGetAsync(indexName, indexType, id) {
def result = new Result<GetResponse>()
client().prepareGet(indexName, indexType, id).execute(new ResultListener<GetResponse>(this, result))
result.get()
}
}

View File

@ -135,6 +135,7 @@ include(":instrumentation:elasticsearch:elasticsearch-rest-5.0:javaagent")
include(":instrumentation:elasticsearch:elasticsearch-rest-6.4:javaagent")
include(":instrumentation:elasticsearch:elasticsearch-rest-7.0:javaagent")
include(":instrumentation:elasticsearch:elasticsearch-transport-common:library")
include(":instrumentation:elasticsearch:elasticsearch-transport-testing")
include(":instrumentation:elasticsearch:elasticsearch-transport-5.0:javaagent")
include(":instrumentation:elasticsearch:elasticsearch-transport-5.3:javaagent")
include(":instrumentation:elasticsearch:elasticsearch-transport-6.0:javaagent")