Add support for ES rest client 6.3 and above

This commit is contained in:
Will Gittoes 2019-01-30 17:39:20 +11:00
parent bd6f732400
commit f15a2ffd79
No known key found for this signature in database
GPG Key ID: 521026A02DB0BB42
9 changed files with 424 additions and 13 deletions

View File

@ -7,7 +7,7 @@ muzzle {
pass {
group = "org.elasticsearch.client"
module = "rest"
versions = "[5.0,)"
versions = "[5.0,6.4)"
assertInverse = true
}
}
@ -48,9 +48,9 @@ dependencies {
See https://github.com/elastic/elasticsearch/commit/0be443c5bbd4c7eb5776740d8fb7117224124cce#diff-d5bb3520f960a753d8f8a3a2686dfd6b
Lock on 6.2.+ to fix tests.
*/
latestDepTestCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.2.+'
latestDepTestCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '6.2.+'
latestDepTestCompile group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.2.+'
latestDepTestCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.3.+'
latestDepTestCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '6.3.+'
latestDepTestCompile group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.3.+'
}
configurations.latestDepTestCompile {

View File

@ -100,4 +100,13 @@ class Elasticsearch6RestClientTest extends AgentTestRunner {
}
}
}
// static class TestNode extends Node {
// TestNode(Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
// super(environment, classpathPlugins, false)
// }
//
// @Override
// protected void registerDerivedNodeNameWithLogger(String nodeName) {}
// }
}

View File

@ -4,7 +4,6 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
@ -47,8 +46,7 @@ public class Elasticsearch5RestClientInstrumentation extends Instrumenter.Defaul
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(isPublic())
.and(named("performRequestAsync"))
.and(named("performRequestAsync").or(named("performRequestAsyncNoCatch")))
.and(takesArguments(7))
.and(takesArgument(0, named("java.lang.String"))) // method
.and(takesArgument(1, named("java.lang.String"))) // endpoint
@ -60,11 +58,11 @@ public class Elasticsearch5RestClientInstrumentation extends Instrumenter.Defaul
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(
@Advice.Argument(0) final String method,
@Advice.Argument(1) final String endpoint,
@Advice.Argument(0) String method,
@Advice.Argument(1) String endpoint,
@Advice.Argument(value = 5, readOnly = false) ResponseListener responseListener) {
final Scope scope =
Scope scope =
GlobalTracer.get()
.buildSpan("elasticsearch.rest.query")
.withTag(DDTags.SERVICE_NAME, "elasticsearch")
@ -80,10 +78,9 @@ public class Elasticsearch5RestClientInstrumentation extends Instrumenter.Defaul
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
public static void stopSpan(@Advice.Enter Scope scope, @Advice.Thrown Throwable throwable) {
if (throwable != null) {
final Span span = scope.span();
Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish();

View File

@ -0,0 +1,50 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
muzzle {
pass {
group = "org.elasticsearch.client"
module = "rest" // elasticsearch-rest-client ?
versions = "[6.4,)"
assertInverse = true
}
}
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest
}
dependencies {
compileOnly group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.4.0'
compile project(':dd-java-agent:agent-tooling')
compile deps.bytebuddy
compile deps.opentracing
annotationProcessor deps.autoservice
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
// Include httpclient instrumentation for testing because it is a dependency for elasticsearch-rest-client.
// It doesn't actually work though. They use HttpAsyncClient, which isn't currently instrumented.
// TODO: add Apache's HttpAsyncClient instrumentation when that is complete.
testCompile project(':dd-java-agent:instrumentation:apache-httpclient-4')
testCompile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0'
testCompile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0'
testCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.4.0'
testCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '6.4.0'
testCompile group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.4.0'
latestDepTestCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '+'
latestDepTestCompile group: 'org.elasticsearch.client', name: 'transport', version: '6.+'
latestDepTestCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '6.+'
latestDepTestCompile group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.+'
}

View File

@ -0,0 +1,116 @@
import com.anotherchrisberry.spock.extensions.retry.RetryOnFailure
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import groovy.json.JsonSlurper
import io.opentracing.tag.Tags
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.http.util.EntityUtils
import org.elasticsearch.client.Request
import org.elasticsearch.client.Response
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.env.Environment
import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node
import org.elasticsearch.plugins.Plugin
import org.elasticsearch.transport.Netty4Plugin
import spock.lang.Shared
@RetryOnFailure(times = 3, delaySeconds = 1)
class Elasticsearch6RestClientTest extends AgentTestRunner {
@Shared
int httpPort
@Shared
int tcpPort
@Shared
Node testNode
@Shared
File esWorkingDir
@Shared
RestClient client
def setupSpec() {
httpPort = TestUtils.randomOpenPort()
tcpPort = TestUtils.randomOpenPort()
esWorkingDir = File.createTempDir("test-es-working-dir-", "")
esWorkingDir.deleteOnExit()
println "ES work dir: $esWorkingDir"
def settings = Settings.builder()
.put("path.home", esWorkingDir.path)
.put("http.port", httpPort)
.put("transport.tcp.port", tcpPort)
.put("cluster.name", "test-cluster")
.build()
testNode = new TestNode(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin])
testNode.start()
client = RestClient.builder(new HttpHost("localhost", httpPort))
.setMaxRetryTimeoutMillis(Integer.MAX_VALUE)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setConnectTimeout(Integer.MAX_VALUE).setSocketTimeout(Integer.MAX_VALUE)
}
})
.build()
}
def cleanupSpec() {
testNode?.close()
if (esWorkingDir != null) {
FileSystemUtils.deleteSubDirectories(esWorkingDir.toPath())
esWorkingDir.delete()
}
}
def "test elasticsearch status"() {
setup:
Request request = new Request("GET", "_cluster/health")
Response response = client.performRequest(request)
Map result = new JsonSlurper().parseText(EntityUtils.toString(response.entity))
expect:
result.status == "green"
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GET _cluster/health"
operationName "elasticsearch.rest.query"
spanType DDSpanTypes.ELASTICSEARCH
parent()
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.ELASTICSEARCH
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_URL.key" "_cluster/health"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" httpPort
defaultTags()
}
}
}
}
}
static class TestNode extends Node {
TestNode(Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
super(environment, classpathPlugins, false)
}
@Override
protected void registerDerivedNodeNameWithLogger(String nodeName) {}
}
}

View File

@ -0,0 +1,90 @@
package datadog.trace.instrumentation.elasticsearch6_4;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseListener;
@AutoService(Instrumenter.class)
public class Elasticsearch6RestClientInstrumentation extends Instrumenter.Default {
public Elasticsearch6RestClientInstrumentation() {
super("elasticsearch", "elasticsearch-rest", "elasticsearch-rest-6");
}
@Override
public String[] helperClassNames() {
return new String[] {"datadog.trace.instrumentation.elasticsearch6_4.RestResponseListener"};
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(named("org.elasticsearch.client.RestClient"));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(named("performRequestAsyncNoCatch"))
.and(takesArguments(2))
.and(takesArgument(0, named("org.elasticsearch.client.Request")))
.and(takesArgument(1, named("org.elasticsearch.client.ResponseListener"))),
ElasticsearchRestClientAdvice.class.getName());
}
public static class ElasticsearchRestClientAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(
@Advice.Argument(0) Request request,
@Advice.Argument(value = 1, readOnly = false) ResponseListener responseListener) {
Scope scope =
GlobalTracer.get()
.buildSpan("elasticsearch.rest.query")
.withTag(DDTags.SERVICE_NAME, "elasticsearch")
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.ELASTICSEARCH)
.withTag(Tags.HTTP_METHOD.getKey(), request.getMethod())
.withTag(Tags.HTTP_URL.getKey(), request.getEndpoint())
.withTag(Tags.COMPONENT.getKey(), "elasticsearch-java")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.startActive(false);
responseListener = new RestResponseListener(responseListener, scope.span());
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(@Advice.Enter Scope scope, @Advice.Thrown Throwable throwable) {
if (throwable != null) {
Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish();
}
scope.close();
}
}
}

View File

@ -0,0 +1,45 @@
package datadog.trace.instrumentation.elasticsearch6_4;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
public class RestResponseListener implements ResponseListener {
private final ResponseListener listener;
private final Span span;
public RestResponseListener(ResponseListener listener, Span span) {
this.listener = listener;
this.span = span;
}
@Override
public void onSuccess(Response response) {
if (response.getHost() != null) {
Tags.PEER_HOSTNAME.set(span, response.getHost().getHostName());
Tags.PEER_PORT.set(span, response.getHost().getPort());
}
try {
listener.onSuccess(response);
} finally {
span.finish();
}
}
@Override
public void onFailure(Exception e) {
span.log(Collections.singletonMap(ERROR_OBJECT, e));
try {
listener.onFailure(e);
} finally {
span.finish();
}
}
}

View File

@ -0,0 +1,103 @@
import com.anotherchrisberry.spock.extensions.retry.RetryOnFailure
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import groovy.json.JsonSlurper
import io.opentracing.tag.Tags
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.http.util.EntityUtils
import org.elasticsearch.client.Response
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty4Plugin
import spock.lang.Shared
@RetryOnFailure(times = 3, delaySeconds = 1)
class Elasticsearch6RestClientTest extends AgentTestRunner {
@Shared
int httpPort
@Shared
int tcpPort
@Shared
Node testNode
@Shared
File esWorkingDir
@Shared
RestClient client
def setupSpec() {
httpPort = TestUtils.randomOpenPort()
tcpPort = TestUtils.randomOpenPort()
esWorkingDir = File.createTempDir("test-es-working-dir-", "")
esWorkingDir.deleteOnExit()
println "ES work dir: $esWorkingDir"
def settings = Settings.builder()
.put("path.home", esWorkingDir.path)
.put("http.port", httpPort)
.put("transport.tcp.port", tcpPort)
.put("cluster.name", "test-cluster")
.build()
testNode = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin])
testNode.start()
client = RestClient.builder(new HttpHost("localhost", httpPort))
.setMaxRetryTimeoutMillis(Integer.MAX_VALUE)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setConnectTimeout(Integer.MAX_VALUE).setSocketTimeout(Integer.MAX_VALUE)
}
})
.build()
}
def cleanupSpec() {
testNode?.close()
if (esWorkingDir != null) {
FileSystemUtils.deleteSubDirectories(esWorkingDir.toPath())
esWorkingDir.delete()
}
}
def "test elasticsearch status"() {
setup:
Response response = client.performRequest("GET", "_cluster/health")
Map result = new JsonSlurper().parseText(EntityUtils.toString(response.entity))
expect:
result.status == "green"
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GET _cluster/health"
operationName "elasticsearch.rest.query"
spanType DDSpanTypes.ELASTICSEARCH
parent()
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.ELASTICSEARCH
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_URL.key" "_cluster/health"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" httpPort
defaultTags()
}
}
}
}
}
}

View File

@ -33,6 +33,7 @@ include ':dd-java-agent:instrumentation:datastax-cassandra-2.3'
include ':dd-java-agent:instrumentation:dropwizard'
include ':dd-java-agent:instrumentation:dropwizard:dropwizard-views'
include ':dd-java-agent:instrumentation:elasticsearch-rest-5'
include ':dd-java-agent:instrumentation:elasticsearch-rest-6.4'
include ':dd-java-agent:instrumentation:elasticsearch-transport-2'
include ':dd-java-agent:instrumentation:elasticsearch-transport-5'
include ':dd-java-agent:instrumentation:elasticsearch-transport-5.3'