Convert play-ws groovy to java (#12377)
Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
parent
00de5e3016
commit
0b9dffaf98
|
@ -1,12 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
class PlayJavaWsClientTest extends PlayJavaWsClientTestBase {}
|
||||
|
||||
class PlayJavaStreamedWsClientTest extends PlayJavaStreamedWsClientTestBase {}
|
||||
|
||||
class PlayScalaWsClientTest extends PlayScalaWsClientTestBase {}
|
||||
|
||||
class PlayScalaStreamedWsClientTest extends PlayScalaStreamedWsClientTestBase {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v1_0;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayJavaStreamedWsClientBaseTest;
|
||||
|
||||
class PlayJavaStreamedWsClientTest extends PlayJavaStreamedWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v1_0;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayJavaWsClientBaseTest;
|
||||
|
||||
class PlayJavaWsClientTest extends PlayJavaWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v1_0;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayScalaStreamedWsClientBaseTest;
|
||||
|
||||
class PlayScalaStreamedWsClientTest extends PlayScalaStreamedWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v1_0;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayScalaWsClientBaseTest;
|
||||
|
||||
class PlayScalaWsClientTest extends PlayScalaWsClientBaseTest {}
|
|
@ -1,12 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
class PlayJavaWsClientTest extends PlayJavaWsClientTestBase {}
|
||||
|
||||
class PlayJavaStreamedWsClientTest extends PlayJavaStreamedWsClientTestBase {}
|
||||
|
||||
class PlayScalaWsClientTest extends PlayScalaWsClientTestBase {}
|
||||
|
||||
class PlayScalaStreamedWsClientTest extends PlayScalaStreamedWsClientTestBase {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_0;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayJavaStreamedWsClientBaseTest;
|
||||
|
||||
class PlayJavaStreamedWsClientTest extends PlayJavaStreamedWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_0;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayJavaWsClientBaseTest;
|
||||
|
||||
class PlayJavaWsClientTest extends PlayJavaWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_0;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayScalaStreamedWsClientBaseTest;
|
||||
|
||||
class PlayScalaStreamedWsClientTest extends PlayScalaStreamedWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_0;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayScalaWsClientBaseTest;
|
||||
|
||||
class PlayScalaWsClientTest extends PlayScalaWsClientBaseTest {}
|
|
@ -41,15 +41,33 @@ dependencies {
|
|||
testInstrumentation(project(":instrumentation:netty:netty-4.1:javaagent"))
|
||||
testInstrumentation(project(":instrumentation:akka:akka-http-10.0:javaagent"))
|
||||
testInstrumentation(project(":instrumentation:akka:akka-actor-2.3:javaagent"))
|
||||
|
||||
latestDepTestLibrary("com.typesafe.play:play-ahc-ws-standalone_2.13:+")
|
||||
}
|
||||
|
||||
if (findProperty("testLatestDeps") as Boolean) {
|
||||
configurations {
|
||||
// play-ws artifact name is different for regular and latest tests
|
||||
testImplementation {
|
||||
exclude("com.typesafe.play", "play-ahc-ws-standalone_$scalaVersion")
|
||||
val testLatestDeps = findProperty("testLatestDeps") as Boolean
|
||||
|
||||
testing {
|
||||
suites {
|
||||
val latestDepTest by registering(JvmTestSuite::class) {
|
||||
dependencies {
|
||||
implementation("com.typesafe.play:play-ahc-ws-standalone_2.13:+")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tasks {
|
||||
if (testLatestDeps) {
|
||||
// disable regular test running and compiling tasks when latest dep test task is run
|
||||
named("test") {
|
||||
enabled = false
|
||||
}
|
||||
}
|
||||
|
||||
named("latestDepTest") {
|
||||
enabled = testLatestDeps
|
||||
}
|
||||
|
||||
check {
|
||||
dependsOn(testing.suites)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import play.libs.ws.StandaloneWSClient;
|
||||
import play.libs.ws.StandaloneWSRequest;
|
||||
import play.libs.ws.StandaloneWSResponse;
|
||||
import play.libs.ws.ahc.StandaloneAhcWSClient;
|
||||
|
||||
class PlayJavaStreamedWsClientTest extends PlayWsClientBaseTest<StandaloneWSRequest> {
|
||||
|
||||
private static StandaloneWSClient wsClient;
|
||||
private static StandaloneWSClient wsClientWithReadTimeout;
|
||||
|
||||
@BeforeAll
|
||||
static void setup() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer);
|
||||
wsClientWithReadTimeout =
|
||||
new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanup() throws IOException {
|
||||
wsClient.close();
|
||||
wsClientWithReadTimeout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
StandaloneWSRequest request = getClient(uri).url(uri.toString()).setFollowRedirects(true);
|
||||
headers.forEach(request::addHeader);
|
||||
request.setMethod(method);
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendRequest(
|
||||
StandaloneWSRequest request, String method, URI uri, Map<String, String> headers)
|
||||
throws ExecutionException, InterruptedException {
|
||||
return internalSendRequest(request).toCompletableFuture().get().getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequestWithCallback(
|
||||
StandaloneWSRequest request,
|
||||
String method,
|
||||
URI uri,
|
||||
Map<String, String> headers,
|
||||
HttpClientResult requestResult) {
|
||||
internalSendRequest(request)
|
||||
.whenComplete(
|
||||
(response, throwable) -> {
|
||||
if (throwable != null) {
|
||||
requestResult.complete(throwable.getCause());
|
||||
} else {
|
||||
requestResult.complete(response.getStatus());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static CompletionStage<StandaloneWSResponse> internalSendRequest(
|
||||
StandaloneWSRequest request) {
|
||||
CompletionStage<? extends StandaloneWSResponse> stream = request.stream();
|
||||
// The status can be ready before the body so explicitly call wait for body to be ready
|
||||
return stream
|
||||
.thenCompose(
|
||||
response -> response.getBodyAsSource().runFold("", (acc, out) -> "", materializer))
|
||||
.thenCombine(stream, (body, response) -> response);
|
||||
}
|
||||
|
||||
private static StandaloneWSClient getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout;
|
||||
}
|
||||
return wsClient;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import play.libs.ws.StandaloneWSClient;
|
||||
import play.libs.ws.StandaloneWSRequest;
|
||||
import play.libs.ws.ahc.StandaloneAhcWSClient;
|
||||
|
||||
class PlayJavaWsClientTest extends PlayWsClientBaseTest<StandaloneWSRequest> {
|
||||
|
||||
private static StandaloneWSClient wsClient;
|
||||
private static StandaloneWSClient wsClientWithReadTimeout;
|
||||
|
||||
@BeforeAll
|
||||
static void setup() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer);
|
||||
wsClientWithReadTimeout =
|
||||
new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanup() throws IOException {
|
||||
wsClient.close();
|
||||
wsClientWithReadTimeout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
StandaloneWSRequest request = getClient(uri).url(uri.toString()).setFollowRedirects(true);
|
||||
headers.forEach(request::addHeader);
|
||||
return request.setMethod(method);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendRequest(
|
||||
StandaloneWSRequest request, String method, URI uri, Map<String, String> headers)
|
||||
throws ExecutionException, InterruptedException {
|
||||
return request.execute().toCompletableFuture().get().getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequestWithCallback(
|
||||
StandaloneWSRequest request,
|
||||
String method,
|
||||
URI uri,
|
||||
Map<String, String> headers,
|
||||
HttpClientResult requestResult) {
|
||||
request
|
||||
.execute()
|
||||
.whenComplete(
|
||||
(response, throwable) -> {
|
||||
if (throwable != null) {
|
||||
requestResult.complete(throwable);
|
||||
} else {
|
||||
requestResult.complete(response.getStatus());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static StandaloneWSClient getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout;
|
||||
}
|
||||
return wsClient;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import play.api.libs.ws.StandaloneWSClient;
|
||||
import play.api.libs.ws.StandaloneWSRequest;
|
||||
import play.api.libs.ws.StandaloneWSResponse;
|
||||
import play.api.libs.ws.ahc.StandaloneAhcWSClient;
|
||||
import scala.Function1;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.jdk.javaapi.CollectionConverters;
|
||||
import scala.util.Try;
|
||||
|
||||
class PlayScalaStreamedWsClientTest extends PlayWsClientBaseTest<StandaloneWSRequest> {
|
||||
|
||||
private static StandaloneWSClient wsClient;
|
||||
private static StandaloneWSClient wsClientWithReadTimeout;
|
||||
|
||||
@BeforeAll
|
||||
void setup() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer);
|
||||
wsClientWithReadTimeout =
|
||||
new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanup() throws IOException {
|
||||
wsClient.close();
|
||||
wsClientWithReadTimeout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers)
|
||||
throws Exception {
|
||||
return getClient(uri)
|
||||
.url(uri.toURL().toString())
|
||||
.withMethod(method)
|
||||
.withFollowRedirects(true)
|
||||
.withHttpHeaders(CollectionConverters.asScala(headers).toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendRequest(
|
||||
StandaloneWSRequest request, String method, URI uri, Map<String, String> headers)
|
||||
throws Exception {
|
||||
return Await.result(internalSendRequest(request), Duration.apply(10, TimeUnit.SECONDS))
|
||||
.status();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequestWithCallback(
|
||||
StandaloneWSRequest request,
|
||||
String method,
|
||||
URI uri,
|
||||
Map<String, String> headers,
|
||||
HttpClientResult requestResult) {
|
||||
internalSendRequest(request)
|
||||
.onComplete(
|
||||
new Function1<Try<StandaloneWSResponse>, Void>() {
|
||||
@Override
|
||||
public Void apply(Try<StandaloneWSResponse> response) {
|
||||
if (response.isSuccess()) {
|
||||
requestResult.complete(response.get().status());
|
||||
} else {
|
||||
requestResult.complete(response.failed().get());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
},
|
||||
ExecutionContext.global());
|
||||
}
|
||||
|
||||
private static Future<StandaloneWSResponse> internalSendRequest(StandaloneWSRequest request) {
|
||||
Future<StandaloneWSResponse> futureResponse = request.stream();
|
||||
// The status can be ready before the body so explicitly call wait for body to be ready
|
||||
Future<String> bodyResponse =
|
||||
futureResponse.flatMap(
|
||||
new Function1<StandaloneWSResponse, Future<String>>() {
|
||||
@Override
|
||||
public Future<String> apply(StandaloneWSResponse wsResponse) {
|
||||
return wsResponse.bodyAsSource().runFold("", (acc, out) -> "", materializer);
|
||||
}
|
||||
},
|
||||
ExecutionContext.global());
|
||||
return bodyResponse.flatMap(
|
||||
new Function1<String, Future<StandaloneWSResponse>>() {
|
||||
@Override
|
||||
public Future<StandaloneWSResponse> apply(String v1) {
|
||||
return futureResponse;
|
||||
}
|
||||
},
|
||||
ExecutionContext.global());
|
||||
}
|
||||
|
||||
private static StandaloneWSClient getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout;
|
||||
}
|
||||
return wsClient;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import play.api.libs.ws.StandaloneWSClient;
|
||||
import play.api.libs.ws.StandaloneWSRequest;
|
||||
import play.api.libs.ws.StandaloneWSResponse;
|
||||
import play.api.libs.ws.ahc.StandaloneAhcWSClient;
|
||||
import scala.Function1;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.jdk.javaapi.CollectionConverters;
|
||||
import scala.util.Try;
|
||||
|
||||
class PlayScalaWsClientTest extends PlayWsClientBaseTest<StandaloneWSRequest> {
|
||||
|
||||
private static StandaloneWSClient wsClient;
|
||||
private static StandaloneWSClient wsClientWithReadTimeout;
|
||||
|
||||
@BeforeAll
|
||||
void setup() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer);
|
||||
wsClientWithReadTimeout =
|
||||
new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanup() throws IOException {
|
||||
wsClient.close();
|
||||
wsClientWithReadTimeout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers)
|
||||
throws MalformedURLException {
|
||||
return getClient(uri)
|
||||
.url(uri.toURL().toString())
|
||||
.withMethod(method)
|
||||
.withFollowRedirects(true)
|
||||
.withHttpHeaders(CollectionConverters.asScala(headers).toSeq());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendRequest(
|
||||
StandaloneWSRequest request, String method, URI uri, Map<String, String> headers)
|
||||
throws Exception {
|
||||
Future<StandaloneWSResponse> futureResponse = request.execute();
|
||||
Await.ready(futureResponse, Duration.apply(10, TimeUnit.SECONDS));
|
||||
Try<StandaloneWSResponse> value = futureResponse.value().get();
|
||||
if (value.isSuccess()) {
|
||||
return value.get().status();
|
||||
}
|
||||
// Catch the Throwable and rethrow it
|
||||
throw (Exception) value.failed().get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequestWithCallback(
|
||||
StandaloneWSRequest request,
|
||||
String method,
|
||||
URI uri,
|
||||
Map<String, String> headers,
|
||||
HttpClientResult requestResult) {
|
||||
request
|
||||
.execute()
|
||||
.onComplete(
|
||||
new Function1<Try<StandaloneWSResponse>, Void>() {
|
||||
@Override
|
||||
public Void apply(Try<StandaloneWSResponse> response) {
|
||||
if (response.isSuccess()) {
|
||||
requestResult.complete(response.get().status());
|
||||
} else {
|
||||
requestResult.complete(response.failed().get());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
},
|
||||
ExecutionContext.global());
|
||||
}
|
||||
|
||||
private static StandaloneWSClient getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout;
|
||||
}
|
||||
return wsClient;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.Materializer;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
|
||||
import io.opentelemetry.semconv.NetworkAttributes;
|
||||
import io.opentelemetry.semconv.ServerAttributes;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import play.shaded.ahc.io.netty.resolver.InetNameResolver;
|
||||
import play.shaded.ahc.io.netty.util.concurrent.EventExecutor;
|
||||
import play.shaded.ahc.io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import play.shaded.ahc.io.netty.util.concurrent.Promise;
|
||||
import play.shaded.ahc.org.asynchttpclient.AsyncHttpClient;
|
||||
import play.shaded.ahc.org.asynchttpclient.AsyncHttpClientConfig;
|
||||
import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClient;
|
||||
import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClientConfig;
|
||||
import play.shaded.ahc.org.asynchttpclient.RequestBuilderBase;
|
||||
|
||||
abstract class PlayWsClientBaseTest<REQUEST> extends AbstractHttpClientTest<REQUEST> {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
|
||||
|
||||
private static ActorSystem system;
|
||||
protected static AsyncHttpClient asyncHttpClient;
|
||||
protected static AsyncHttpClient asyncHttpClientWithReadTimeout;
|
||||
protected static Materializer materializer;
|
||||
|
||||
@BeforeAll
|
||||
static void setupHttpClient() {
|
||||
String name = "play-ws";
|
||||
system = ActorSystem.create(name);
|
||||
materializer = Materializer.matFromSystem(system);
|
||||
|
||||
// Replace dns name resolver with custom implementation that returns only once address for each
|
||||
// host. This is needed for "connection error dropped request" because in case of connection
|
||||
// failure ahc will try the next address which isn't necessary for this test.
|
||||
RequestBuilderBase.DEFAULT_NAME_RESOLVER =
|
||||
new CustomNameResolver(ImmediateEventExecutor.INSTANCE);
|
||||
|
||||
asyncHttpClient = createClient(false);
|
||||
asyncHttpClientWithReadTimeout = createClient(true);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanupHttpClient() throws IOException {
|
||||
asyncHttpClient.close();
|
||||
asyncHttpClientWithReadTimeout.close();
|
||||
system.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
|
||||
super.configure(optionsBuilder);
|
||||
// apparently play ws does not report the 302 status code
|
||||
optionsBuilder.setResponseCodeOnRedirectError(null);
|
||||
optionsBuilder.setMaxRedirects(3);
|
||||
optionsBuilder.spanEndsAfterBody();
|
||||
optionsBuilder.setHttpAttributes(
|
||||
uri -> {
|
||||
Set<AttributeKey<?>> attributes =
|
||||
new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
|
||||
attributes.remove(NetworkAttributes.NETWORK_PROTOCOL_VERSION);
|
||||
if (uri.toString().endsWith("/circular-redirect")) {
|
||||
attributes.remove(ServerAttributes.SERVER_ADDRESS);
|
||||
attributes.remove(ServerAttributes.SERVER_PORT);
|
||||
}
|
||||
return attributes;
|
||||
});
|
||||
}
|
||||
|
||||
private static AsyncHttpClient createClient(boolean readTimeout) {
|
||||
DefaultAsyncHttpClientConfig.Builder builder =
|
||||
new DefaultAsyncHttpClientConfig.Builder()
|
||||
.setMaxRequestRetry(0)
|
||||
.setShutdownQuietPeriod(0)
|
||||
.setShutdownTimeout(0)
|
||||
.setMaxRedirects(3)
|
||||
.setConnectTimeout(5000);
|
||||
|
||||
if (readTimeout) {
|
||||
builder.setReadTimeout(2000);
|
||||
}
|
||||
|
||||
AsyncHttpClientConfig asyncHttpClientConfig = builder.build();
|
||||
return new DefaultAsyncHttpClient(asyncHttpClientConfig);
|
||||
}
|
||||
|
||||
private static class CustomNameResolver extends InetNameResolver {
|
||||
|
||||
public CustomNameResolver(EventExecutor executor) {
|
||||
super(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception {
|
||||
try {
|
||||
promise.setSuccess(InetAddress.getByName(inetHost));
|
||||
} catch (UnknownHostException exception) {
|
||||
promise.setFailure(exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise)
|
||||
throws Exception {
|
||||
try {
|
||||
// default implementation calls InetAddress.getAllByName
|
||||
promise.setSuccess(Collections.singletonList(InetAddress.getByName(inetHost)));
|
||||
} catch (UnknownHostException exception) {
|
||||
promise.setFailure(exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,12 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
class PlayJavaWsClientTest extends PlayJavaWsClientTestBase {}
|
||||
|
||||
class PlayJavaStreamedWsClientTest extends PlayJavaStreamedWsClientTestBase {}
|
||||
|
||||
class PlayScalaWsClientTest extends PlayScalaWsClientTestBase {}
|
||||
|
||||
class PlayScalaStreamedWsClientTest extends PlayScalaStreamedWsClientTestBase {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayJavaStreamedWsClientBaseTest;
|
||||
|
||||
class PlayJavaStreamedWsClientTest extends PlayJavaStreamedWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayJavaWsClientBaseTest;
|
||||
|
||||
class PlayJavaWsClientTest extends PlayJavaWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayScalaStreamedWsClientBaseTest;
|
||||
|
||||
class PlayScalaStreamedWsClientTest extends PlayScalaStreamedWsClientBaseTest {}
|
|
@ -0,0 +1,10 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws.v2_1;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.playws.PlayScalaWsClientBaseTest;
|
||||
|
||||
class PlayScalaWsClientTest extends PlayScalaWsClientBaseTest {}
|
|
@ -8,7 +8,5 @@ dependencies {
|
|||
api(project(":testing-common"))
|
||||
api("com.typesafe.play:play-ahc-ws-standalone_$scalaVersion:1.0.2")
|
||||
|
||||
implementation("org.apache.groovy:groovy")
|
||||
implementation("io.opentelemetry:opentelemetry-api")
|
||||
implementation("org.spockframework:spock-core")
|
||||
}
|
||||
|
|
|
@ -1,247 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult
|
||||
import play.libs.ws.StandaloneWSClient
|
||||
import play.libs.ws.StandaloneWSRequest
|
||||
import play.libs.ws.StandaloneWSResponse
|
||||
import play.libs.ws.ahc.StandaloneAhcWSClient
|
||||
import scala.Function1
|
||||
import scala.collection.JavaConverters
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.util.Try
|
||||
import spock.lang.Shared
|
||||
|
||||
import java.util.concurrent.CompletionStage
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
class PlayJavaWsClientTestBase extends PlayWsClientTestBaseBase<StandaloneWSRequest> {
|
||||
@Shared
|
||||
StandaloneWSClient wsClient
|
||||
@Shared
|
||||
StandaloneWSClient wsClientWithReadTimeout
|
||||
|
||||
@Override
|
||||
StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
def request = getClient(uri).url(uri.toURL().toString()).setFollowRedirects(true)
|
||||
headers.entrySet().each { entry -> request.addHeader(entry.getKey(), entry.getValue()) }
|
||||
return request.setMethod(method)
|
||||
}
|
||||
|
||||
@Override
|
||||
int sendRequest(StandaloneWSRequest request, String method, URI uri, Map<String, String> headers) {
|
||||
return request.execute().toCompletableFuture().get().status
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendRequestWithCallback(StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, HttpClientResult requestResult) {
|
||||
request.execute().whenComplete { response, throwable ->
|
||||
requestResult.complete({ response.status }, throwable)
|
||||
}
|
||||
}
|
||||
|
||||
def getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout
|
||||
}
|
||||
return wsClient
|
||||
}
|
||||
|
||||
def setupSpec() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer)
|
||||
wsClientWithReadTimeout = new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
wsClient?.close()
|
||||
wsClientWithReadTimeout?.close()
|
||||
}
|
||||
}
|
||||
|
||||
class PlayJavaStreamedWsClientTestBase extends PlayWsClientTestBaseBase<StandaloneWSRequest> {
|
||||
@Shared
|
||||
StandaloneWSClient wsClient
|
||||
@Shared
|
||||
StandaloneWSClient wsClientWithReadTimeout
|
||||
|
||||
@Override
|
||||
StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
def request = getClient(uri).url(uri.toURL().toString()).setFollowRedirects(true)
|
||||
headers.entrySet().each { entry -> request.addHeader(entry.getKey(), entry.getValue()) }
|
||||
request.setMethod(method)
|
||||
return request
|
||||
}
|
||||
|
||||
@Override
|
||||
int sendRequest(StandaloneWSRequest request, String method, URI uri, Map<String, String> headers) {
|
||||
return internalSendRequest(request).toCompletableFuture().get().status
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendRequestWithCallback(StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, HttpClientResult requestResult) {
|
||||
internalSendRequest(request).whenComplete { response, throwable ->
|
||||
requestResult.complete({ response.status }, throwable?.getCause())
|
||||
}
|
||||
}
|
||||
|
||||
private CompletionStage<StandaloneWSResponse> internalSendRequest(StandaloneWSRequest request) {
|
||||
def stream = request.stream()
|
||||
// The status can be ready before the body so explicitly call wait for body to be ready
|
||||
return stream
|
||||
.thenCompose { StandaloneWSResponse response ->
|
||||
response.getBodyAsSource().runFold("", { acc, out -> "" }, materializer)
|
||||
}
|
||||
.thenCombine(stream) { String body, StandaloneWSResponse response ->
|
||||
response
|
||||
}
|
||||
}
|
||||
|
||||
def getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout
|
||||
}
|
||||
return wsClient
|
||||
}
|
||||
|
||||
def setupSpec() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer)
|
||||
wsClientWithReadTimeout = new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
wsClient?.close()
|
||||
wsClientWithReadTimeout?.close()
|
||||
}
|
||||
}
|
||||
|
||||
class PlayScalaWsClientTestBase extends PlayWsClientTestBaseBase<play.api.libs.ws.StandaloneWSRequest> {
|
||||
@Shared
|
||||
play.api.libs.ws.StandaloneWSClient wsClient
|
||||
@Shared
|
||||
play.api.libs.ws.StandaloneWSClient wsClientWithReadTimeout
|
||||
|
||||
@Override
|
||||
play.api.libs.ws.StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
return getClient(uri).url(uri.toURL().toString())
|
||||
.withMethod(method)
|
||||
.withFollowRedirects(true)
|
||||
.withHttpHeaders(JavaConverters.mapAsScalaMap(headers).toSeq())
|
||||
}
|
||||
|
||||
@Override
|
||||
int sendRequest(play.api.libs.ws.StandaloneWSRequest request, String method, URI uri, Map<String, String> headers) {
|
||||
def futureResponse = request.execute()
|
||||
Await.ready(futureResponse, Duration.apply(10, TimeUnit.SECONDS))
|
||||
def value = futureResponse.value().get()
|
||||
if (value.isSuccess()) {
|
||||
return value.get().status()
|
||||
}
|
||||
throw value.failed().get()
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendRequestWithCallback(play.api.libs.ws.StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, HttpClientResult requestResult) {
|
||||
request.execute().onComplete(new Function1<Try<play.api.libs.ws.StandaloneWSResponse>, Void>() {
|
||||
@Override
|
||||
Void apply(Try<play.api.libs.ws.StandaloneWSResponse> response) {
|
||||
if (response.isSuccess()) {
|
||||
requestResult.complete(response.get().status())
|
||||
} else {
|
||||
requestResult.complete(response.failed().get())
|
||||
}
|
||||
return null
|
||||
}
|
||||
}, ExecutionContext.global())
|
||||
}
|
||||
|
||||
def getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout
|
||||
}
|
||||
return wsClient
|
||||
}
|
||||
|
||||
def setupSpec() {
|
||||
wsClient = new play.api.libs.ws.ahc.StandaloneAhcWSClient(asyncHttpClient, materializer)
|
||||
wsClientWithReadTimeout = new play.api.libs.ws.ahc.StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
wsClient?.close()
|
||||
wsClientWithReadTimeout?.close()
|
||||
}
|
||||
}
|
||||
|
||||
class PlayScalaStreamedWsClientTestBase extends PlayWsClientTestBaseBase<play.api.libs.ws.StandaloneWSRequest> {
|
||||
@Shared
|
||||
play.api.libs.ws.StandaloneWSClient wsClient
|
||||
@Shared
|
||||
play.api.libs.ws.StandaloneWSClient wsClientWithReadTimeout
|
||||
|
||||
@Override
|
||||
play.api.libs.ws.StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
return getClient(uri).url(uri.toURL().toString())
|
||||
.withMethod(method)
|
||||
.withFollowRedirects(true)
|
||||
.withHttpHeaders(JavaConverters.mapAsScalaMap(headers).toSeq())
|
||||
}
|
||||
|
||||
@Override
|
||||
int sendRequest(play.api.libs.ws.StandaloneWSRequest request, String method, URI uri, Map<String, String> headers) {
|
||||
Await.result(internalSendRequest(request), Duration.apply(10, TimeUnit.SECONDS)).status()
|
||||
}
|
||||
|
||||
@Override
|
||||
void sendRequestWithCallback(play.api.libs.ws.StandaloneWSRequest request, String method, URI uri, Map<String, String> headers, HttpClientResult requestResult) {
|
||||
internalSendRequest(request).onComplete(new Function1<Try<play.api.libs.ws.StandaloneWSResponse>, Void>() {
|
||||
@Override
|
||||
Void apply(Try<play.api.libs.ws.StandaloneWSResponse> response) {
|
||||
if (response.isSuccess()) {
|
||||
requestResult.complete(response.get().status())
|
||||
} else {
|
||||
requestResult.complete(response.failed().get())
|
||||
}
|
||||
return null
|
||||
}
|
||||
}, ExecutionContext.global())
|
||||
}
|
||||
|
||||
private Future<play.api.libs.ws.StandaloneWSResponse> internalSendRequest(play.api.libs.ws.StandaloneWSRequest request) {
|
||||
Future<play.api.libs.ws.StandaloneWSResponse> futureResponse = request.stream()
|
||||
// The status can be ready before the body so explicitly call wait for body to be ready
|
||||
Future<String> bodyResponse = futureResponse.flatMap(new Function1<play.api.libs.ws.StandaloneWSResponse, Future<String>>() {
|
||||
@Override
|
||||
Future<String> apply(play.api.libs.ws.StandaloneWSResponse wsResponse) {
|
||||
return wsResponse.bodyAsSource().runFold("", { acc, out -> "" }, materializer)
|
||||
}
|
||||
}, ExecutionContext.global())
|
||||
return bodyResponse.flatMap(new Function1<String, Future<play.api.libs.ws.StandaloneWSResponse>>() {
|
||||
@Override
|
||||
Future<play.api.libs.ws.StandaloneWSResponse> apply(String v1) {
|
||||
return futureResponse
|
||||
}
|
||||
}, ExecutionContext.global())
|
||||
}
|
||||
|
||||
def getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout
|
||||
}
|
||||
return wsClient
|
||||
}
|
||||
|
||||
def setupSpec() {
|
||||
wsClient = new play.api.libs.ws.ahc.StandaloneAhcWSClient(asyncHttpClient, materializer)
|
||||
wsClientWithReadTimeout = new play.api.libs.ws.ahc.StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
wsClient?.close()
|
||||
wsClientWithReadTimeout?.close()
|
||||
}
|
||||
}
|
|
@ -1,119 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
import io.opentelemetry.api.common.AttributeKey
|
||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||
import io.opentelemetry.instrumentation.test.base.HttpClientTest
|
||||
import io.opentelemetry.semconv.ServerAttributes
|
||||
import io.opentelemetry.semconv.NetworkAttributes
|
||||
import play.shaded.ahc.io.netty.resolver.InetNameResolver
|
||||
import play.shaded.ahc.io.netty.util.concurrent.EventExecutor
|
||||
import play.shaded.ahc.io.netty.util.concurrent.ImmediateEventExecutor
|
||||
import play.shaded.ahc.io.netty.util.concurrent.Promise
|
||||
import play.shaded.ahc.org.asynchttpclient.AsyncHttpClient
|
||||
import play.shaded.ahc.org.asynchttpclient.AsyncHttpClientConfig
|
||||
import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClient
|
||||
import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClientConfig
|
||||
import play.shaded.ahc.org.asynchttpclient.RequestBuilderBase
|
||||
import spock.lang.Shared
|
||||
|
||||
abstract class PlayWsClientTestBaseBase<REQUEST> extends HttpClientTest<REQUEST> implements AgentTestTrait {
|
||||
@Shared
|
||||
ActorSystem system
|
||||
|
||||
@Shared
|
||||
AsyncHttpClient asyncHttpClient
|
||||
|
||||
@Shared
|
||||
AsyncHttpClient asyncHttpClientWithReadTimeout
|
||||
|
||||
@Shared
|
||||
ActorMaterializer materializer
|
||||
|
||||
def setupSpec() {
|
||||
String name = "play-ws"
|
||||
system = ActorSystem.create(name)
|
||||
ActorMaterializerSettings settings = ActorMaterializerSettings.create(system)
|
||||
materializer = ActorMaterializer.create(settings, system, name)
|
||||
|
||||
// Replace dns name resolver with custom implementation that returns only once address for each
|
||||
// host. This is needed for "connection error dropped request" because in case of connection
|
||||
// failure ahc will try the next address which isn't necessary for this test.
|
||||
RequestBuilderBase.DEFAULT_NAME_RESOLVER = new CustomNameResolver(ImmediateEventExecutor.INSTANCE)
|
||||
|
||||
asyncHttpClient = createClient(false)
|
||||
asyncHttpClientWithReadTimeout = createClient(true)
|
||||
}
|
||||
|
||||
def createClient(boolean readTimeout) {
|
||||
DefaultAsyncHttpClientConfig.Builder builder = new DefaultAsyncHttpClientConfig.Builder()
|
||||
.setMaxRequestRetry(0)
|
||||
.setShutdownQuietPeriod(0)
|
||||
.setShutdownTimeout(0)
|
||||
.setMaxRedirects(3)
|
||||
.setConnectTimeout(CONNECT_TIMEOUT_MS)
|
||||
|
||||
if (readTimeout) {
|
||||
builder.setReadTimeout(READ_TIMEOUT_MS)
|
||||
}
|
||||
|
||||
AsyncHttpClientConfig asyncHttpClientConfig =builder.build()
|
||||
return new DefaultAsyncHttpClient(asyncHttpClientConfig)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
system?.terminate()
|
||||
asyncHttpClient?.close()
|
||||
asyncHttpClientWithReadTimeout?.close()
|
||||
}
|
||||
|
||||
@Override
|
||||
Integer responseCodeOnRedirectError() {
|
||||
// apparently play ws does not report the 302 status code
|
||||
null
|
||||
}
|
||||
|
||||
@Override
|
||||
int maxRedirects() {
|
||||
3
|
||||
}
|
||||
|
||||
@Override
|
||||
Set<AttributeKey<?>> httpAttributes(URI uri) {
|
||||
def attributes = super.httpAttributes(uri)
|
||||
attributes.remove(NetworkAttributes.NETWORK_PROTOCOL_VERSION)
|
||||
if (uri.toString().endsWith("/circular-redirect")) {
|
||||
attributes.remove(ServerAttributes.SERVER_ADDRESS)
|
||||
attributes.remove(ServerAttributes.SERVER_PORT)
|
||||
}
|
||||
return attributes
|
||||
}
|
||||
}
|
||||
|
||||
class CustomNameResolver extends InetNameResolver {
|
||||
CustomNameResolver(EventExecutor executor) {
|
||||
super(executor)
|
||||
}
|
||||
|
||||
protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception {
|
||||
try {
|
||||
promise.setSuccess(InetAddress.getByName(inetHost))
|
||||
} catch (UnknownHostException exception) {
|
||||
promise.setFailure(exception)
|
||||
}
|
||||
}
|
||||
|
||||
protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise) throws Exception {
|
||||
try {
|
||||
// default implementation calls InetAddress.getAllByName
|
||||
promise.setSuccess(Collections.singletonList(InetAddress.getByName(inetHost)))
|
||||
} catch (UnknownHostException exception) {
|
||||
promise.setFailure(exception)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import play.libs.ws.StandaloneWSClient;
|
||||
import play.libs.ws.StandaloneWSRequest;
|
||||
import play.libs.ws.StandaloneWSResponse;
|
||||
import play.libs.ws.ahc.StandaloneAhcWSClient;
|
||||
|
||||
public class PlayJavaStreamedWsClientBaseTest extends PlayWsClientBaseTest<StandaloneWSRequest> {
|
||||
|
||||
private static StandaloneWSClient wsClient;
|
||||
private static StandaloneWSClient wsClientWithReadTimeout;
|
||||
|
||||
@BeforeAll
|
||||
static void setup() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer);
|
||||
wsClientWithReadTimeout =
|
||||
new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanup() throws IOException {
|
||||
wsClient.close();
|
||||
wsClientWithReadTimeout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
StandaloneWSRequest request = getClient(uri).url(uri.toString()).setFollowRedirects(true);
|
||||
headers.forEach(request::addHeader);
|
||||
request.setMethod(method);
|
||||
return request;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendRequest(
|
||||
StandaloneWSRequest request, String method, URI uri, Map<String, String> headers)
|
||||
throws ExecutionException, InterruptedException {
|
||||
return internalSendRequest(request).toCompletableFuture().get().getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequestWithCallback(
|
||||
StandaloneWSRequest request,
|
||||
String method,
|
||||
URI uri,
|
||||
Map<String, String> headers,
|
||||
HttpClientResult requestResult) {
|
||||
internalSendRequest(request)
|
||||
.whenComplete(
|
||||
(response, throwable) -> {
|
||||
if (throwable != null) {
|
||||
requestResult.complete(throwable.getCause());
|
||||
} else {
|
||||
requestResult.complete(response.getStatus());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static CompletionStage<StandaloneWSResponse> internalSendRequest(
|
||||
StandaloneWSRequest request) {
|
||||
CompletionStage<? extends StandaloneWSResponse> stream = request.stream();
|
||||
// The status can be ready before the body so explicitly call wait for body to be ready
|
||||
return stream
|
||||
.thenCompose(
|
||||
response -> response.getBodyAsSource().runFold("", (acc, out) -> "", materializer))
|
||||
.thenCombine(stream, (body, response) -> response);
|
||||
}
|
||||
|
||||
private static StandaloneWSClient getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout;
|
||||
}
|
||||
return wsClient;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import play.libs.ws.StandaloneWSClient;
|
||||
import play.libs.ws.StandaloneWSRequest;
|
||||
import play.libs.ws.ahc.StandaloneAhcWSClient;
|
||||
|
||||
public class PlayJavaWsClientBaseTest extends PlayWsClientBaseTest<StandaloneWSRequest> {
|
||||
|
||||
private static StandaloneWSClient wsClient;
|
||||
private static StandaloneWSClient wsClientWithReadTimeout;
|
||||
|
||||
@BeforeAll
|
||||
static void setup() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer);
|
||||
wsClientWithReadTimeout =
|
||||
new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer);
|
||||
// autoCleanup.deferCleanup(wsClient);
|
||||
// autoCleanup.deferCleanup(wsClientWithReadTimeout);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanup() throws IOException {
|
||||
wsClient.close();
|
||||
wsClientWithReadTimeout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers) {
|
||||
StandaloneWSRequest request = getClient(uri).url(uri.toString()).setFollowRedirects(true);
|
||||
headers.forEach(request::addHeader);
|
||||
return request.setMethod(method);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendRequest(
|
||||
StandaloneWSRequest request, String method, URI uri, Map<String, String> headers)
|
||||
throws ExecutionException, InterruptedException {
|
||||
return request.execute().toCompletableFuture().get().getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequestWithCallback(
|
||||
StandaloneWSRequest request,
|
||||
String method,
|
||||
URI uri,
|
||||
Map<String, String> headers,
|
||||
HttpClientResult requestResult) {
|
||||
request
|
||||
.execute()
|
||||
.whenComplete(
|
||||
(response, throwable) -> {
|
||||
if (throwable != null) {
|
||||
requestResult.complete(throwable);
|
||||
} else {
|
||||
requestResult.complete(response.getStatus());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static StandaloneWSClient getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout;
|
||||
}
|
||||
return wsClient;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import play.api.libs.ws.StandaloneWSClient;
|
||||
import play.api.libs.ws.StandaloneWSRequest;
|
||||
import play.api.libs.ws.StandaloneWSResponse;
|
||||
import play.api.libs.ws.ahc.StandaloneAhcWSClient;
|
||||
import scala.Function1;
|
||||
import scala.collection.JavaConverters;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.util.Try;
|
||||
|
||||
public class PlayScalaStreamedWsClientBaseTest extends PlayWsClientBaseTest<StandaloneWSRequest> {
|
||||
|
||||
private static StandaloneWSClient wsClient;
|
||||
private static StandaloneWSClient wsClientWithReadTimeout;
|
||||
|
||||
@BeforeAll
|
||||
void setup() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer);
|
||||
wsClientWithReadTimeout =
|
||||
new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanup() throws IOException {
|
||||
wsClient.close();
|
||||
wsClientWithReadTimeout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers)
|
||||
throws Exception {
|
||||
return getClient(uri)
|
||||
.url(uri.toURL().toString())
|
||||
.withMethod(method)
|
||||
.withFollowRedirects(true)
|
||||
.withHttpHeaders(JavaConverters.mapAsScalaMap(headers).toSeq());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendRequest(
|
||||
StandaloneWSRequest request, String method, URI uri, Map<String, String> headers)
|
||||
throws Exception {
|
||||
return Await.result(internalSendRequest(request), Duration.apply(10, TimeUnit.SECONDS))
|
||||
.status();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequestWithCallback(
|
||||
StandaloneWSRequest request,
|
||||
String method,
|
||||
URI uri,
|
||||
Map<String, String> headers,
|
||||
HttpClientResult requestResult) {
|
||||
internalSendRequest(request)
|
||||
.onComplete(
|
||||
new Function1<Try<StandaloneWSResponse>, Void>() {
|
||||
@Override
|
||||
public Void apply(Try<StandaloneWSResponse> response) {
|
||||
if (response.isSuccess()) {
|
||||
requestResult.complete(response.get().status());
|
||||
} else {
|
||||
requestResult.complete(response.failed().get());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
},
|
||||
ExecutionContext.global());
|
||||
}
|
||||
|
||||
private static Future<StandaloneWSResponse> internalSendRequest(StandaloneWSRequest request) {
|
||||
Future<StandaloneWSResponse> futureResponse = request.stream();
|
||||
// The status can be ready before the body so explicitly call wait for body to be ready
|
||||
Future<String> bodyResponse =
|
||||
futureResponse.flatMap(
|
||||
new Function1<StandaloneWSResponse, Future<String>>() {
|
||||
@Override
|
||||
public Future<String> apply(StandaloneWSResponse wsResponse) {
|
||||
return wsResponse.bodyAsSource().runFold("", (acc, out) -> "", materializer);
|
||||
}
|
||||
},
|
||||
ExecutionContext.global());
|
||||
return bodyResponse.flatMap(
|
||||
new Function1<String, Future<StandaloneWSResponse>>() {
|
||||
@Override
|
||||
public Future<StandaloneWSResponse> apply(String v1) {
|
||||
return futureResponse;
|
||||
}
|
||||
},
|
||||
ExecutionContext.global());
|
||||
}
|
||||
|
||||
private static StandaloneWSClient getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout;
|
||||
}
|
||||
return wsClient;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws;
|
||||
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import play.api.libs.ws.StandaloneWSClient;
|
||||
import play.api.libs.ws.StandaloneWSRequest;
|
||||
import play.api.libs.ws.StandaloneWSResponse;
|
||||
import play.api.libs.ws.ahc.StandaloneAhcWSClient;
|
||||
import scala.Function1;
|
||||
import scala.collection.JavaConverters;
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.ExecutionContext;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.Duration;
|
||||
import scala.util.Try;
|
||||
|
||||
public class PlayScalaWsClientBaseTest extends PlayWsClientBaseTest<StandaloneWSRequest> {
|
||||
|
||||
private static StandaloneWSClient wsClient;
|
||||
private static StandaloneWSClient wsClientWithReadTimeout;
|
||||
|
||||
@BeforeAll
|
||||
void setup() {
|
||||
wsClient = new StandaloneAhcWSClient(asyncHttpClient, materializer);
|
||||
wsClientWithReadTimeout =
|
||||
new StandaloneAhcWSClient(asyncHttpClientWithReadTimeout, materializer);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanup() throws IOException {
|
||||
wsClient.close();
|
||||
wsClientWithReadTimeout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StandaloneWSRequest buildRequest(String method, URI uri, Map<String, String> headers)
|
||||
throws MalformedURLException {
|
||||
return getClient(uri)
|
||||
.url(uri.toURL().toString())
|
||||
.withMethod(method)
|
||||
.withFollowRedirects(true)
|
||||
.withHttpHeaders(JavaConverters.mapAsScalaMap(headers).toSeq());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int sendRequest(
|
||||
StandaloneWSRequest request, String method, URI uri, Map<String, String> headers)
|
||||
throws Exception {
|
||||
Future<StandaloneWSResponse> futureResponse = request.execute();
|
||||
Await.ready(futureResponse, Duration.apply(10, TimeUnit.SECONDS));
|
||||
Try<StandaloneWSResponse> value = futureResponse.value().get();
|
||||
if (value.isSuccess()) {
|
||||
return value.get().status();
|
||||
}
|
||||
// Catch the Throwable and rethrow it
|
||||
throw (Exception) value.failed().get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendRequestWithCallback(
|
||||
StandaloneWSRequest request,
|
||||
String method,
|
||||
URI uri,
|
||||
Map<String, String> headers,
|
||||
HttpClientResult requestResult) {
|
||||
request
|
||||
.execute()
|
||||
.onComplete(
|
||||
new Function1<Try<StandaloneWSResponse>, Void>() {
|
||||
@Override
|
||||
public Void apply(Try<StandaloneWSResponse> response) {
|
||||
if (response.isSuccess()) {
|
||||
requestResult.complete(response.get().status());
|
||||
} else {
|
||||
requestResult.complete(response.failed().get());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
},
|
||||
ExecutionContext.global());
|
||||
}
|
||||
|
||||
private static StandaloneWSClient getClient(URI uri) {
|
||||
if (uri.toString().contains("/read-timeout")) {
|
||||
return wsClientWithReadTimeout;
|
||||
}
|
||||
return wsClient;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,134 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.playws;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.ActorMaterializer;
|
||||
import akka.stream.ActorMaterializerSettings;
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
|
||||
import io.opentelemetry.semconv.NetworkAttributes;
|
||||
import io.opentelemetry.semconv.ServerAttributes;
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import play.shaded.ahc.io.netty.resolver.InetNameResolver;
|
||||
import play.shaded.ahc.io.netty.util.concurrent.EventExecutor;
|
||||
import play.shaded.ahc.io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import play.shaded.ahc.io.netty.util.concurrent.Promise;
|
||||
import play.shaded.ahc.org.asynchttpclient.AsyncHttpClient;
|
||||
import play.shaded.ahc.org.asynchttpclient.AsyncHttpClientConfig;
|
||||
import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClient;
|
||||
import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClientConfig;
|
||||
import play.shaded.ahc.org.asynchttpclient.RequestBuilderBase;
|
||||
|
||||
abstract class PlayWsClientBaseTest<REQUEST> extends AbstractHttpClientTest<REQUEST> {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
|
||||
|
||||
private static ActorSystem system;
|
||||
protected static AsyncHttpClient asyncHttpClient;
|
||||
protected static AsyncHttpClient asyncHttpClientWithReadTimeout;
|
||||
protected static ActorMaterializer materializer;
|
||||
|
||||
@BeforeAll
|
||||
static void setupHttpClient() {
|
||||
String name = "play-ws";
|
||||
system = ActorSystem.create(name);
|
||||
materializer = ActorMaterializer.create(ActorMaterializerSettings.create(system), system, name);
|
||||
|
||||
// Replace dns name resolver with custom implementation that returns only once address for each
|
||||
// host. This is needed for "connection error dropped request" because in case of connection
|
||||
// failure ahc will try the next address which isn't necessary for this test.
|
||||
RequestBuilderBase.DEFAULT_NAME_RESOLVER =
|
||||
new CustomNameResolver(ImmediateEventExecutor.INSTANCE);
|
||||
|
||||
asyncHttpClient = createClient(false);
|
||||
asyncHttpClientWithReadTimeout = createClient(true);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void cleanupHttpClient() throws IOException {
|
||||
asyncHttpClient.close();
|
||||
asyncHttpClientWithReadTimeout.close();
|
||||
system.terminate();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
|
||||
super.configure(optionsBuilder);
|
||||
// apparently play ws does not report the 302 status code
|
||||
optionsBuilder.setResponseCodeOnRedirectError(null);
|
||||
optionsBuilder.setMaxRedirects(3);
|
||||
optionsBuilder.spanEndsAfterBody();
|
||||
optionsBuilder.setHttpAttributes(
|
||||
uri -> {
|
||||
Set<AttributeKey<?>> attributes =
|
||||
new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
|
||||
attributes.remove(NetworkAttributes.NETWORK_PROTOCOL_VERSION);
|
||||
if (uri.toString().endsWith("/circular-redirect")) {
|
||||
attributes.remove(ServerAttributes.SERVER_ADDRESS);
|
||||
attributes.remove(ServerAttributes.SERVER_PORT);
|
||||
}
|
||||
return attributes;
|
||||
});
|
||||
}
|
||||
|
||||
private static AsyncHttpClient createClient(boolean readTimeout) {
|
||||
DefaultAsyncHttpClientConfig.Builder builder =
|
||||
new DefaultAsyncHttpClientConfig.Builder()
|
||||
.setMaxRequestRetry(0)
|
||||
.setShutdownQuietPeriod(0)
|
||||
.setShutdownTimeout(0)
|
||||
.setMaxRedirects(3)
|
||||
.setConnectTimeout(5000);
|
||||
|
||||
if (readTimeout) {
|
||||
builder.setReadTimeout(2000);
|
||||
}
|
||||
|
||||
AsyncHttpClientConfig asyncHttpClientConfig = builder.build();
|
||||
return new DefaultAsyncHttpClient(asyncHttpClientConfig);
|
||||
}
|
||||
|
||||
private static class CustomNameResolver extends InetNameResolver {
|
||||
|
||||
public CustomNameResolver(EventExecutor executor) {
|
||||
super(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception {
|
||||
try {
|
||||
promise.setSuccess(InetAddress.getByName(inetHost));
|
||||
} catch (UnknownHostException exception) {
|
||||
promise.setFailure(exception);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise)
|
||||
throws Exception {
|
||||
try {
|
||||
// default implementation calls InetAddress.getAllByName
|
||||
promise.setSuccess(Collections.singletonList(InetAddress.getByName(inetHost)));
|
||||
} catch (UnknownHostException exception) {
|
||||
promise.setFailure(exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue