Add support for elasticsearch 7 rest client (#2528)

* Add support for elasticsearch 7 rest client

* remove commented out line

* Trigger Build

* exclude bad version from muzzle

* imrove muzzle checks

* Trigger Build

* Add comment
This commit is contained in:
Lauri Tulmin 2021-03-11 09:03:24 +02:00 committed by GitHub
parent 97fa9932d4
commit 14dcd14871
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 287 additions and 84 deletions

View File

@ -18,6 +18,7 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.RestResponseListener;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;
@ -84,6 +85,7 @@ public class Elasticsearch5RestClientInstrumentationModule extends Instrumentati
if (throwable != null) {
tracer().endExceptionally(context, throwable);
}
// span ended in RestResponseListener
}
}
}

View File

@ -1,11 +1,10 @@
apply from: "$rootDir/gradle/instrumentation.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
muzzle {
pass {
group = "org.elasticsearch.client"
module = "elasticsearch-rest-client"
versions = "[6.4,)"
versions = "[6.4,7.0)"
assertInverse = true
}
@ -16,12 +15,8 @@ muzzle {
}
}
testSets {
latestDepTest
}
dependencies {
compileOnly group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.4.0'
library group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.4.0'
implementation project(':instrumentation:elasticsearch:elasticsearch-rest-common:javaagent')
@ -34,13 +29,11 @@ dependencies {
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0'
testImplementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.4.0'
testImplementation group: 'org.elasticsearch', name: 'elasticsearch', version: '6.4.0'
testImplementation group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.4.0'
testLibrary group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.4.0'
testLibrary group: 'org.elasticsearch', name: 'elasticsearch', version: '6.4.0'
testLibrary group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.4.0'
// TODO: The tests are incompatible with 7.x. The instrumentation may be as well.
latestDepTestImplementation group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.+'
latestDepTestImplementation group: 'org.elasticsearch.client', name: 'transport', version: '6.+'
latestDepTestImplementation group: 'org.elasticsearch', name: 'elasticsearch', version: '6.+'
latestDepTestImplementation group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.+'
latestDepTestLibrary group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.+'
latestDepTestLibrary group: 'org.elasticsearch', name: 'elasticsearch', version: '6.+'
latestDepTestLibrary group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.+'
}

View File

@ -7,16 +7,19 @@ package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.v6_4;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.RestResponseListener;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;
@ -34,6 +37,12 @@ public class Elasticsearch6RestClientInstrumentationModule extends Instrumentati
super("elasticsearch-rest", "elasticsearch-rest-6.0", "elasticsearch");
}
@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
// class introduced in 7.0.0
return not(hasClassesNamed("org.elasticsearch.client.RestClient$InternalRequest"));
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new RestClientInstrumentation());
@ -84,6 +93,7 @@ public class Elasticsearch6RestClientInstrumentationModule extends Instrumentati
if (throwable != null) {
tracer().endExceptionally(context, throwable);
}
// span ended in RestResponseListener
}
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.v6_4;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;
import io.opentelemetry.context.Context;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
public class RestResponseListener implements ResponseListener {
private final ResponseListener listener;
private final Context context;
public RestResponseListener(ResponseListener listener, Context context) {
this.listener = listener;
this.context = context;
}
@Override
public void onSuccess(Response response) {
if (response.getHost() != null) {
tracer().onResponse(context, response);
}
tracer().end(context);
listener.onSuccess(response);
}
@Override
public void onFailure(Exception e) {
tracer().endExceptionally(context, e);
listener.onFailure(e);
}
}

View File

@ -18,9 +18,7 @@ import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.http.HttpServerTransport
import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty4Plugin
import spock.lang.Shared
class Elasticsearch6RestClientTest extends AgentInstrumentationSpecification {
@ -46,7 +44,7 @@ class Elasticsearch6RestClientTest extends AgentInstrumentationSpecification {
.put("path.home", esWorkingDir.path)
.put("cluster.name", clusterName)
.build()
testNode = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin])
testNode = NodeFactory.newNode(settings)
testNode.start()
httpTransportAddress = testNode.injector().getInstance(HttpServerTransport).boundAddress().publishAddress()

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty4Plugin
class NodeFactory {
static Node newNode(Settings settings) {
def version = org.elasticsearch.Version.CURRENT
if (version.major == 6 && version.minor >= 5) {
return new NodeV65(settings)
}
return new NodeV6(settings)
}
static class NodeV6 extends Node {
NodeV6(Settings settings) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin])
}
protected void registerDerivedNodeNameWithLogger(String s) {
}
}
static class NodeV65 extends Node {
NodeV65(Settings settings) {
super(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin], true)
}
protected void registerDerivedNodeNameWithLogger(String s) {
}
}
}

View File

@ -0,0 +1,34 @@
apply from: "$rootDir/gradle/instrumentation.gradle"
muzzle {
pass {
group = "org.elasticsearch.client"
module = "elasticsearch-rest-client"
versions = "[7.0,)"
assertInverse = true
}
fail {
group = "org.elasticsearch.client"
module = "rest"
versions = "(,)"
}
}
dependencies {
library group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '7.0.0'
implementation project(':instrumentation:elasticsearch:elasticsearch-rest-common:javaagent')
testInstrumentation project(':instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent')
testInstrumentation project(':instrumentation:apache-httpasyncclient-4.1:javaagent')
//TODO: review the following claim, we are not using embedded ES anymore
// Netty is used, but it adds complexity to the tests since we're using embedded ES.
//testInstrumentation project(':instrumentation:netty:netty-4.1:javaagent')
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0'
testLibrary group: 'org.elasticsearch', name: 'elasticsearch', version: '7.0.0'
testLibrary group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '7.0.0'
}

View File

@ -0,0 +1,139 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.v7_0;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonList;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.RestResponseListener;
import io.opentelemetry.javaagent.tooling.InstrumentationModule;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
@AutoService(InstrumentationModule.class)
public class Elasticsearch7RestClientInstrumentationModule extends InstrumentationModule {
public Elasticsearch7RestClientInstrumentationModule() {
super("elasticsearch-rest", "elasticsearch-rest-7.0", "elasticsearch");
}
@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
// class introduced in 7.0.0
return hasClassesNamed("org.elasticsearch.client.RestClient$InternalRequest");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new RestClientInstrumentation());
}
public static class RestClientInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.elasticsearch.client.RestClient");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
Map<ElementMatcher<MethodDescription>, String> transformers = new HashMap<>();
transformers.put(
isMethod()
.and(named("performRequest"))
.and(takesArguments(1))
.and(takesArgument(0, named("org.elasticsearch.client.Request"))),
Elasticsearch7RestClientInstrumentationModule.class.getName() + "$PerformRequestAdvice");
transformers.put(
isMethod()
.and(named("performRequestAsync"))
.and(takesArguments(2))
.and(takesArgument(0, named("org.elasticsearch.client.Request")))
.and(takesArgument(1, named("org.elasticsearch.client.ResponseListener"))),
Elasticsearch7RestClientInstrumentationModule.class.getName()
+ "$PerformRequestAsyncAdvice");
return transformers;
}
}
public static class PerformRequestAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) Request request,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
context =
tracer()
.startSpan(currentContext(), null, request.getMethod() + " " + request.getEndpoint());
scope = context.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Return(readOnly = false) Response response,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(context, throwable);
} else {
if (response != null) {
tracer().onResponse(context, response);
}
tracer().end(context);
}
}
}
public static class PerformRequestAsyncAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) Request request,
@Advice.Argument(value = 1, readOnly = false) ResponseListener responseListener,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
context =
tracer()
.startSpan(currentContext(), null, request.getMethod() + " " + request.getEndpoint());
scope = context.makeCurrent();
responseListener = new RestResponseListener(responseListener, context);
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
scope.close();
if (throwable != null) {
tracer().endExceptionally(context, throwable);
}
// span ended in RestResponseListener
}
}
}

View File

@ -8,37 +8,36 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT
import groovy.json.JsonSlurper
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CountDownLatch
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.http.util.EntityUtils
import org.elasticsearch.client.Request
import org.elasticsearch.client.Response
import org.elasticsearch.client.ResponseListener
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.env.Environment
import org.elasticsearch.http.HttpServerTransport
import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node
import org.elasticsearch.plugins.Plugin
import org.elasticsearch.transport.Netty4Plugin
import spock.lang.Shared
class Elasticsearch6RestClientTest extends AgentInstrumentationSpecification {
class Elasticsearch7RestClientTest extends AgentInstrumentationSpecification {
@Shared
TransportAddress httpTransportAddress
@Shared
Node testNode
@Shared
File esWorkingDir
@Shared
String clusterName = UUID.randomUUID().toString()
@Shared
RestClient client
@Shared
String clusterName = UUID.randomUUID().toString()
def setupSpec() {
@ -50,12 +49,11 @@ class Elasticsearch6RestClientTest extends AgentInstrumentationSpecification {
.put("path.home", esWorkingDir.path)
.put("cluster.name", clusterName)
.build()
testNode = new TestNode(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin])
testNode = new Node(InternalSettingsPreparer.prepareEnvironment(settings, Collections.emptyMap(), null, { "default node name" }), [Netty4Plugin], true)
testNode.start()
httpTransportAddress = testNode.injector().getInstance(HttpServerTransport).boundAddress().publishAddress()
client = RestClient.builder(new HttpHost(httpTransportAddress.address, httpTransportAddress.port))
.setMaxRetryTimeoutMillis(Integer.MAX_VALUE)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
@ -76,8 +74,7 @@ class Elasticsearch6RestClientTest extends AgentInstrumentationSpecification {
def "test elasticsearch status"() {
setup:
Request request = new Request("GET", "_cluster/health")
Response response = client.performRequest(request)
Response response = client.performRequest(new Request("GET", "_cluster/health"))
Map result = new JsonSlurper().parseText(EntityUtils.toString(response.entity))
@ -101,13 +98,50 @@ class Elasticsearch6RestClientTest extends AgentInstrumentationSpecification {
}
}
static class TestNode extends Node {
TestNode(Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
super(environment, classpathPlugins, false)
}
def "test elasticsearch status async"() {
setup:
Response requestResponse = null
Exception exception = null
CountDownLatch countDownLatch = new CountDownLatch(1)
ResponseListener responseListener = new ResponseListener() {
@Override
void onSuccess(Response response) {
requestResponse = response
countDownLatch.countDown()
}
@Override
protected void registerDerivedNodeNameWithLogger(String nodeName) {}
@Override
void onFailure(Exception e) {
exception = e
countDownLatch.countDown()
}
}
client.performRequestAsync(new Request("GET", "_cluster/health"), responseListener)
countDownLatch.await()
if (exception != null) {
throw exception
}
Map result = new JsonSlurper().parseText(EntityUtils.toString(requestResponse.entity))
expect:
result.status == "green"
assertTraces(1) {
trace(0, 1) {
span(0) {
name "GET _cluster/health"
kind CLIENT
hasNoParent()
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "elasticsearch"
"${SemanticAttributes.DB_OPERATION.key}" "GET _cluster/health"
"${SemanticAttributes.NET_PEER_NAME.key}" httpTransportAddress.address
"${SemanticAttributes.NET_PEER_PORT.key}" httpTransportAddress.port
}
}
}
}
}
String expectedOperationName(String method) {

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.v5_0;
package io.opentelemetry.javaagent.instrumentation.elasticsearch.rest;
import static io.opentelemetry.javaagent.instrumentation.elasticsearch.rest.ElasticsearchRestClientTracer.tracer;

View File

@ -5,21 +5,15 @@ muzzle {
group = "org.elasticsearch.client"
module = "transport"
versions = "[6.0.0,)"
skipVersions += ['7.11.0']
assertInverse = true
}
pass {
group = "org.elasticsearch"
module = "elasticsearch"
versions = "[6.0.0,)"
}
fail {
group = "org.elasticsearch.client"
module = "transport"
versions = "[,6.0.0)"
}
fail {
group = "org.elasticsearch"
module = "elasticsearch"
versions = "[,6.0.0)"
skipVersions += ['7.11.0']
assertInverse = true
}
}

View File

@ -99,6 +99,7 @@ include ':instrumentation:eclipse-osgi-3.6:javaagent'
include ':instrumentation:elasticsearch:elasticsearch-rest-common:javaagent'
include ':instrumentation:elasticsearch:elasticsearch-rest-5.0:javaagent'
include ':instrumentation:elasticsearch:elasticsearch-rest-6.4:javaagent'
include ':instrumentation:elasticsearch:elasticsearch-rest-7.0:javaagent'
include ':instrumentation:elasticsearch:elasticsearch-transport-common:javaagent'
include ':instrumentation:elasticsearch:elasticsearch-transport-5.0:javaagent'
include ':instrumentation:elasticsearch:elasticsearch-transport-5.3:javaagent'