Merge pull request #679 from DataDog/willgittoes-dd/es-rest-6.4

Add support for Elasticsearch rest client 6.3 and above
This commit is contained in:
Will Gittoes 2019-01-31 11:02:59 +11:00 committed by GitHub
commit 5941a93ad4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 424 additions and 12 deletions

View File

@ -7,9 +7,15 @@ muzzle {
pass {
group = "org.elasticsearch.client"
module = "rest"
versions = "[5.0,)"
versions = "[5.0,6.4)"
assertInverse = true
}
pass {
group = "org.elasticsearch.client"
module = "elasticsearch-rest-client"
versions = "[5.0,6.4)"
}
}
apply from: "${rootDir}/gradle/java.gradle"
@ -43,14 +49,9 @@ dependencies {
testCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '5.0.0'
testCompile group: 'org.elasticsearch.plugin', name: 'transport-netty3-client', version: '5.0.0'
/*
We know that 6.3.+ Doesn't work because they've reworked relevant code.
See https://github.com/elastic/elasticsearch/commit/0be443c5bbd4c7eb5776740d8fb7117224124cce#diff-d5bb3520f960a753d8f8a3a2686dfd6b
Lock on 6.2.+ to fix tests.
*/
latestDepTestCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.2.+'
latestDepTestCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '6.2.+'
latestDepTestCompile group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.2.+'
latestDepTestCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.3.+'
latestDepTestCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '6.3.+'
latestDepTestCompile group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.3.+'
}
configurations.latestDepTestCompile {

View File

@ -4,7 +4,6 @@ import static io.opentracing.log.Fields.ERROR_OBJECT;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
@ -47,8 +46,7 @@ public class Elasticsearch5RestClientInstrumentation extends Instrumenter.Defaul
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(isPublic())
.and(named("performRequestAsync"))
.and(named("performRequestAsync").or(named("performRequestAsyncNoCatch")))
.and(takesArguments(7))
.and(takesArgument(0, named("java.lang.String"))) // method
.and(takesArgument(1, named("java.lang.String"))) // endpoint

View File

@ -0,0 +1,57 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
muzzle {
pass {
group = "org.elasticsearch.client"
module = "relasticsearch-rest-client"
versions = "[6.4,)"
assertInverse = true
}
fail {
group = "org.elasticsearch.client"
module = "rest"
versions = "(,)"
assertInverse = true
}
}
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest
}
dependencies {
compileOnly group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.4.0'
compile project(':dd-java-agent:agent-tooling')
compile deps.bytebuddy
compile deps.opentracing
annotationProcessor deps.autoservice
implementation deps.autoservice
testCompile project(':dd-java-agent:testing')
// Include httpclient instrumentation for testing because it is a dependency for elasticsearch-rest-client.
// It doesn't actually work though. They use HttpAsyncClient, which isn't currently instrumented.
// TODO: add Apache's HttpAsyncClient instrumentation when that is complete.
testCompile project(':dd-java-agent:instrumentation:apache-httpclient-4')
testCompile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.0'
testCompile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.0'
testCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '6.4.0'
testCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '6.4.0'
testCompile group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.4.0'
latestDepTestCompile group: 'org.elasticsearch.client', name: 'elasticsearch-rest-client', version: '+'
latestDepTestCompile group: 'org.elasticsearch.client', name: 'transport', version: '6.+'
latestDepTestCompile group: 'org.elasticsearch', name: 'elasticsearch', version: '6.+'
latestDepTestCompile group: 'org.elasticsearch.plugin', name: 'transport-netty4-client', version: '6.+'
}

View File

@ -0,0 +1,116 @@
import com.anotherchrisberry.spock.extensions.retry.RetryOnFailure
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import groovy.json.JsonSlurper
import io.opentracing.tag.Tags
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.http.util.EntityUtils
import org.elasticsearch.client.Request
import org.elasticsearch.client.Response
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.env.Environment
import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node
import org.elasticsearch.plugins.Plugin
import org.elasticsearch.transport.Netty4Plugin
import spock.lang.Shared
@RetryOnFailure(times = 3, delaySeconds = 1)
class Elasticsearch6RestClientTest extends AgentTestRunner {
@Shared
int httpPort
@Shared
int tcpPort
@Shared
Node testNode
@Shared
File esWorkingDir
@Shared
RestClient client
def setupSpec() {
httpPort = TestUtils.randomOpenPort()
tcpPort = TestUtils.randomOpenPort()
esWorkingDir = File.createTempDir("test-es-working-dir-", "")
esWorkingDir.deleteOnExit()
println "ES work dir: $esWorkingDir"
def settings = Settings.builder()
.put("path.home", esWorkingDir.path)
.put("http.port", httpPort)
.put("transport.tcp.port", tcpPort)
.put("cluster.name", "test-cluster")
.build()
testNode = new TestNode(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin])
testNode.start()
client = RestClient.builder(new HttpHost("localhost", httpPort))
.setMaxRetryTimeoutMillis(Integer.MAX_VALUE)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setConnectTimeout(Integer.MAX_VALUE).setSocketTimeout(Integer.MAX_VALUE)
}
})
.build()
}
def cleanupSpec() {
testNode?.close()
if (esWorkingDir != null) {
FileSystemUtils.deleteSubDirectories(esWorkingDir.toPath())
esWorkingDir.delete()
}
}
def "test elasticsearch status"() {
setup:
Request request = new Request("GET", "_cluster/health")
Response response = client.performRequest(request)
Map result = new JsonSlurper().parseText(EntityUtils.toString(response.entity))
expect:
result.status == "green"
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GET _cluster/health"
operationName "elasticsearch.rest.query"
spanType DDSpanTypes.ELASTICSEARCH
parent()
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.ELASTICSEARCH
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_URL.key" "_cluster/health"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" httpPort
defaultTags()
}
}
}
}
}
static class TestNode extends Node {
TestNode(Environment environment, Collection<Class<? extends Plugin>> classpathPlugins) {
super(environment, classpathPlugins, false)
}
@Override
protected void registerDerivedNodeNameWithLogger(String nodeName) {}
}
}

View File

@ -0,0 +1,91 @@
package datadog.trace.instrumentation.elasticsearch6_4;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseListener;
@AutoService(Instrumenter.class)
public class Elasticsearch6RestClientInstrumentation extends Instrumenter.Default {
public Elasticsearch6RestClientInstrumentation() {
super("elasticsearch", "elasticsearch-rest", "elasticsearch-rest-6");
}
@Override
public String[] helperClassNames() {
return new String[] {"datadog.trace.instrumentation.elasticsearch6_4.RestResponseListener"};
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface()).and(named("org.elasticsearch.client.RestClient"));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod()
.and(named("performRequestAsyncNoCatch"))
.and(takesArguments(2))
.and(takesArgument(0, named("org.elasticsearch.client.Request")))
.and(takesArgument(1, named("org.elasticsearch.client.ResponseListener"))),
ElasticsearchRestClientAdvice.class.getName());
}
public static class ElasticsearchRestClientAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static Scope startSpan(
@Advice.Argument(0) final Request request,
@Advice.Argument(value = 1, readOnly = false) ResponseListener responseListener) {
final Scope scope =
GlobalTracer.get()
.buildSpan("elasticsearch.rest.query")
.withTag(DDTags.SERVICE_NAME, "elasticsearch")
.withTag(DDTags.SPAN_TYPE, DDSpanTypes.ELASTICSEARCH)
.withTag(Tags.HTTP_METHOD.getKey(), request.getMethod())
.withTag(Tags.HTTP_URL.getKey(), request.getEndpoint())
.withTag(Tags.COMPONENT.getKey(), "elasticsearch-java")
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.startActive(false);
responseListener = new RestResponseListener(responseListener, scope.span());
return scope;
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Enter final Scope scope, @Advice.Thrown final Throwable throwable) {
if (throwable != null) {
final Span span = scope.span();
Tags.ERROR.set(span, true);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
span.finish();
}
scope.close();
}
}
}

View File

@ -0,0 +1,45 @@
package datadog.trace.instrumentation.elasticsearch6_4;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import java.util.Collections;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
public class RestResponseListener implements ResponseListener {
private final ResponseListener listener;
private final Span span;
public RestResponseListener(final ResponseListener listener, final Span span) {
this.listener = listener;
this.span = span;
}
@Override
public void onSuccess(final Response response) {
if (response.getHost() != null) {
Tags.PEER_HOSTNAME.set(span, response.getHost().getHostName());
Tags.PEER_PORT.set(span, response.getHost().getPort());
}
try {
listener.onSuccess(response);
} finally {
span.finish();
}
}
@Override
public void onFailure(final Exception e) {
span.log(Collections.singletonMap(ERROR_OBJECT, e));
try {
listener.onFailure(e);
} finally {
span.finish();
}
}
}

View File

@ -0,0 +1,103 @@
import com.anotherchrisberry.spock.extensions.retry.RetryOnFailure
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.DDTags
import groovy.json.JsonSlurper
import io.opentracing.tag.Tags
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.http.util.EntityUtils
import org.elasticsearch.client.Response
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestClientBuilder
import org.elasticsearch.common.io.FileSystemUtils
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.node.InternalSettingsPreparer
import org.elasticsearch.node.Node
import org.elasticsearch.transport.Netty4Plugin
import spock.lang.Shared
@RetryOnFailure(times = 3, delaySeconds = 1)
class Elasticsearch6RestClientTest extends AgentTestRunner {
@Shared
int httpPort
@Shared
int tcpPort
@Shared
Node testNode
@Shared
File esWorkingDir
@Shared
RestClient client
def setupSpec() {
httpPort = TestUtils.randomOpenPort()
tcpPort = TestUtils.randomOpenPort()
esWorkingDir = File.createTempDir("test-es-working-dir-", "")
esWorkingDir.deleteOnExit()
println "ES work dir: $esWorkingDir"
def settings = Settings.builder()
.put("path.home", esWorkingDir.path)
.put("http.port", httpPort)
.put("transport.tcp.port", tcpPort)
.put("cluster.name", "test-cluster")
.build()
testNode = new Node(InternalSettingsPreparer.prepareEnvironment(settings, null), [Netty4Plugin])
testNode.start()
client = RestClient.builder(new HttpHost("localhost", httpPort))
.setMaxRetryTimeoutMillis(Integer.MAX_VALUE)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
return builder.setConnectTimeout(Integer.MAX_VALUE).setSocketTimeout(Integer.MAX_VALUE)
}
})
.build()
}
def cleanupSpec() {
testNode?.close()
if (esWorkingDir != null) {
FileSystemUtils.deleteSubDirectories(esWorkingDir.toPath())
esWorkingDir.delete()
}
}
def "test elasticsearch status"() {
setup:
Response response = client.performRequest("GET", "_cluster/health")
Map result = new JsonSlurper().parseText(EntityUtils.toString(response.entity))
expect:
result.status == "green"
assertTraces(1) {
trace(0, 1) {
span(0) {
serviceName "elasticsearch"
resourceName "GET _cluster/health"
operationName "elasticsearch.rest.query"
spanType DDSpanTypes.ELASTICSEARCH
parent()
tags {
"$Tags.COMPONENT.key" "elasticsearch-java"
"$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT
"$DDTags.SPAN_TYPE" DDSpanTypes.ELASTICSEARCH
"$Tags.HTTP_METHOD.key" "GET"
"$Tags.HTTP_URL.key" "_cluster/health"
"$Tags.PEER_HOSTNAME.key" "localhost"
"$Tags.PEER_PORT.key" httpPort
defaultTags()
}
}
}
}
}
}

View File

@ -33,6 +33,7 @@ include ':dd-java-agent:instrumentation:datastax-cassandra-2.3'
include ':dd-java-agent:instrumentation:dropwizard'
include ':dd-java-agent:instrumentation:dropwizard:dropwizard-views'
include ':dd-java-agent:instrumentation:elasticsearch-rest-5'
include ':dd-java-agent:instrumentation:elasticsearch-rest-6.4'
include ':dd-java-agent:instrumentation:elasticsearch-transport-2'
include ':dd-java-agent:instrumentation:elasticsearch-transport-5'
include ':dd-java-agent:instrumentation:elasticsearch-transport-5.3'