Rewrite netty 3.8 tests to Java (#7666)

Part of #7195
This commit is contained in:
Mateusz Rzeszutek 2023-01-30 08:46:05 +01:00 committed by GitHub
parent bf0e20e1db
commit 245a9f7cf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 392 additions and 318 deletions

View File

@ -1,147 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.ning.http.client.AsyncCompletionHandler
import com.ning.http.client.AsyncHttpClient
import com.ning.http.client.AsyncHttpClientConfig
import com.ning.http.client.Request
import com.ning.http.client.RequestBuilder
import com.ning.http.client.Response
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.context.Context
import io.opentelemetry.context.Scope
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpClientTest
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import spock.lang.AutoCleanup
import spock.lang.Shared
import java.nio.channels.ClosedChannelException
class Netty38ClientTest extends HttpClientTest<Request> implements AgentTestTrait {
@Shared
@AutoCleanup
AsyncHttpClient client = new AsyncHttpClient(getClientConfig())
def getClientConfig() {
def builder = new AsyncHttpClientConfig.Builder()
.setUserAgent("test-user-agent")
if (builder.metaClass.getMetaMethod("setConnectTimeout", int) != null) {
builder.setConnectTimeout(CONNECT_TIMEOUT_MS)
} else {
builder.setRequestTimeoutInMs(CONNECT_TIMEOUT_MS)
}
if (builder.metaClass.getMetaMethod("setFollowRedirect", boolean) != null) {
builder.setFollowRedirect(true)
} else {
builder.setFollowRedirects(true)
}
if (builder.metaClass.getMetaMethod("setMaxRedirects", int) != null) {
builder.setMaxRedirects(3)
} else {
builder.setMaximumNumberOfRedirects(3)
}
// with connection pooling is enabled there are occasional failures in high concurrency test
if (builder.metaClass.getMetaMethod("setAllowPoolingConnections", boolean) != null) {
builder.setAllowPoolingConnections(false)
} else {
builder.setAllowPoolingConnection(false)
}
return builder.build()
}
@Override
Request buildRequest(String method, URI uri, Map<String, String> headers) {
def requestBuilder = new RequestBuilder(method)
.setUrl(uri.toString())
headers.entrySet().each {
requestBuilder.addHeader(it.key, it.value)
}
return requestBuilder.build()
}
@Override
int sendRequest(Request request, String method, URI uri, Map<String, String> headers) {
return client.executeRequest(request).get().statusCode
}
@Override
void sendRequestWithCallback(Request request, String method, URI uri, Map<String, String> headers, HttpClientResult requestResult) {
// TODO: context is not automatically propagated into callbacks
Context context = Context.current()
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(request, new AsyncCompletionHandler<Void>() {
@Override
Void onCompleted(Response response) throws Exception {
try (Scope scope = context.makeCurrent()) {
requestResult.complete(response.statusCode)
}
return null
}
@Override
void onThrowable(Throwable throwable) {
try (Scope scope = context.makeCurrent()) {
requestResult.complete(throwable)
}
}
})
}
@Override
String userAgent() {
return "test-user-agent"
}
@Override
String expectedClientSpanName(URI uri, String method) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "http://192.0.2.1/": // non routable address
return "CONNECT"
default:
return super.expectedClientSpanName(uri, method)
}
}
@Override
Throwable clientSpanError(URI uri, Throwable exception) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
exception = exception.getCause() != null ? exception.getCause() : new ConnectException("Connection refused: localhost/127.0.0.1:61")
break
case "http://192.0.2.1/": // non routable address
exception = exception.getCause() != null ? exception.getCause() : new ClosedChannelException()
}
return exception
}
@Override
Set<AttributeKey<?>> httpAttributes(URI uri) {
switch (uri.toString()) {
case "http://localhost:61/": // unopened port
case "http://192.0.2.1/": // non routable address
return []
}
def attributes = super.httpAttributes(uri)
attributes.remove(SemanticAttributes.NET_PEER_NAME)
attributes.remove(SemanticAttributes.NET_PEER_PORT)
return attributes
}
@Override
boolean testRedirects() {
false
}
@Override
boolean testHttps() {
false
}
}

View File

@ -1,171 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.instrumentation.test.base.HttpServerTest
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.buffer.ChannelBuffers
import org.jboss.netty.channel.ChannelHandlerContext
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.ChannelPipelineFactory
import org.jboss.netty.channel.DefaultChannelPipeline
import org.jboss.netty.channel.DownstreamMessageEvent
import org.jboss.netty.channel.ExceptionEvent
import org.jboss.netty.channel.FailedChannelFuture
import org.jboss.netty.channel.MessageEvent
import org.jboss.netty.channel.SimpleChannelHandler
import org.jboss.netty.channel.SucceededChannelFuture
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.http.DefaultHttpResponse
import org.jboss.netty.handler.codec.http.HttpRequest
import org.jboss.netty.handler.codec.http.HttpResponse
import org.jboss.netty.handler.codec.http.HttpResponseStatus
import org.jboss.netty.handler.codec.http.HttpServerCodec
import org.jboss.netty.handler.codec.http.QueryStringDecoder
import org.jboss.netty.handler.logging.LoggingHandler
import org.jboss.netty.logging.InternalLogLevel
import org.jboss.netty.logging.InternalLoggerFactory
import org.jboss.netty.logging.Slf4JLoggerFactory
import org.jboss.netty.util.CharsetUtil
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.CAPTURE_HEADERS
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.ERROR
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.EXCEPTION
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.INDEXED_CHILD
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.QUERY_PARAM
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.REDIRECT
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.SUCCESS
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.forPath
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.LOCATION
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1
class Netty38ServerTest extends HttpServerTest<ServerBootstrap> implements AgentTestTrait {
static final LoggingHandler LOGGING_HANDLER
static {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
LOGGING_HANDLER = new LoggingHandler(SERVER_LOGGER.name, InternalLogLevel.DEBUG, true)
}
ChannelPipeline channelPipeline() {
ChannelPipeline channelPipeline = new DefaultChannelPipeline()
channelPipeline.addFirst("logger", LOGGING_HANDLER)
channelPipeline.addLast("http-codec", new HttpServerCodec())
channelPipeline.addLast("controller", new SimpleChannelHandler() {
@Override
void messageReceived(ChannelHandlerContext ctx, MessageEvent msg) throws Exception {
if (msg.getMessage() instanceof HttpRequest) {
def request = msg.getMessage() as HttpRequest
def uri = URI.create(request.getUri())
ServerEndpoint endpoint = forPath(uri.path)
ctx.sendDownstream controller(endpoint) {
HttpResponse response
ChannelBuffer responseContent = null
switch (endpoint) {
case SUCCESS:
case ERROR:
responseContent = ChannelBuffers.copiedBuffer(endpoint.body, CharsetUtil.UTF_8)
response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status))
response.setContent(responseContent)
break
case INDEXED_CHILD:
responseContent = ChannelBuffers.EMPTY_BUFFER
endpoint.collectSpanAttributes { new QueryStringDecoder(uri).getParameters().get(it).find() }
response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status))
response.setContent(responseContent)
break
case QUERY_PARAM:
responseContent = ChannelBuffers.copiedBuffer(uri.query, CharsetUtil.UTF_8)
response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status))
response.setContent(responseContent)
break
case REDIRECT:
responseContent = ChannelBuffers.EMPTY_BUFFER
response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status))
response.setContent(responseContent)
response.headers().set(LOCATION, endpoint.body)
break
case CAPTURE_HEADERS:
responseContent = ChannelBuffers.copiedBuffer(endpoint.body, CharsetUtil.UTF_8)
response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status))
response.headers().set("X-Test-Response", request.headers().get("X-Test-Request"))
response.setContent(responseContent)
break
case EXCEPTION:
throw new Exception(endpoint.body)
default:
responseContent = ChannelBuffers.copiedBuffer(NOT_FOUND.body, CharsetUtil.UTF_8)
response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.valueOf(endpoint.status))
response.setContent(responseContent)
break
}
response.headers().set(CONTENT_TYPE, "text/plain")
if (responseContent) {
response.headers().set(CONTENT_LENGTH, responseContent.readableBytes())
}
return new DownstreamMessageEvent(
ctx.getChannel(),
new SucceededChannelFuture(ctx.getChannel()),
response,
ctx.getChannel().getRemoteAddress())
}
}
}
@Override
void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent ex) throws Exception {
def message = ex.cause == null ? "<no cause> " + ex.message : ex.cause.message == null ? "<null>" : ex.cause.message
ChannelBuffer buffer = ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR)
response.setContent(buffer)
response.headers().set(CONTENT_TYPE, "text/plain")
response.headers().set(CONTENT_LENGTH, buffer.readableBytes())
ctx.sendDownstream(new DownstreamMessageEvent(
ctx.getChannel(),
new FailedChannelFuture(ctx.getChannel(), ex.getCause()),
response,
ctx.getChannel().getRemoteAddress()))
}
})
return channelPipeline
}
@Override
ServerBootstrap startServer(int port) {
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory())
bootstrap.setParentHandler(LOGGING_HANDLER)
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
ChannelPipeline getPipeline() throws Exception {
return channelPipeline()
}
})
InetSocketAddress address = new InetSocketAddress(port)
bootstrap.bind(address)
return bootstrap
}
@Override
void stopServer(ServerBootstrap server) {
server?.shutdown()
}
@Override
Set<AttributeKey<?>> httpAttributes(ServerEndpoint endpoint) {
def attributes = super.httpAttributes(endpoint)
attributes.remove(SemanticAttributes.HTTP_ROUTE)
attributes
}
}

View File

@ -0,0 +1,181 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v3_8.client;
import static java.util.Collections.emptySet;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Request;
import com.ning.http.client.RequestBuilder;
import com.ning.http.client.Response;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpClientTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientResult;
import io.opentelemetry.instrumentation.testing.junit.http.HttpClientTestOptions;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
class Netty38ClientTest extends AbstractHttpClientTest<Request> {
@RegisterExtension
static final InstrumentationExtension testing = HttpClientInstrumentationExtension.forAgent();
static final String USER_AGENT = "test-user-agent";
AsyncHttpClient client;
@BeforeEach
void setUp() throws Exception {
AsyncHttpClientConfig.Builder builder =
new AsyncHttpClientConfig.Builder().setUserAgent(USER_AGENT);
Method setConnectTimeout;
try {
setConnectTimeout =
AsyncHttpClientConfig.Builder.class.getMethod("setConnectTimeout", int.class);
} catch (NoSuchMethodException e) {
setConnectTimeout =
AsyncHttpClientConfig.Builder.class.getMethod("setRequestTimeoutInMs", int.class);
}
setConnectTimeout.invoke(builder, (int) CONNECTION_TIMEOUT.toMillis());
Method setFollowRedirect;
try {
setFollowRedirect =
AsyncHttpClientConfig.Builder.class.getMethod("setFollowRedirect", boolean.class);
} catch (NoSuchMethodException e) {
setFollowRedirect =
AsyncHttpClientConfig.Builder.class.getMethod("setFollowRedirects", boolean.class);
}
setFollowRedirect.invoke(builder, true);
Method setMaxRedirects;
try {
setMaxRedirects = AsyncHttpClientConfig.Builder.class.getMethod("setMaxRedirects", int.class);
} catch (NoSuchMethodException e) {
setMaxRedirects =
AsyncHttpClientConfig.Builder.class.getMethod("setMaximumNumberOfRedirects", int.class);
}
setMaxRedirects.invoke(builder, 3);
Method setAllowPoolingConnections;
try {
setAllowPoolingConnections =
AsyncHttpClientConfig.Builder.class.getMethod(
"setAllowPoolingConnections", boolean.class);
} catch (NoSuchMethodException e) {
setAllowPoolingConnections =
AsyncHttpClientConfig.Builder.class.getMethod("setAllowPoolingConnection", boolean.class);
}
setAllowPoolingConnections.invoke(builder, false);
client = new AsyncHttpClient(builder.build());
}
@Override
public Request buildRequest(String method, URI uri, Map<String, String> headers) {
RequestBuilder requestBuilder = new RequestBuilder(method).setUrl(uri.toString());
headers.forEach(requestBuilder::addHeader);
return requestBuilder.build();
}
@Override
public int sendRequest(Request request, String method, URI uri, Map<String, String> headers)
throws Exception {
return client.executeRequest(request).get().getStatusCode();
}
@Override
public void sendRequestWithCallback(
Request request,
String method,
URI uri,
Map<String, String> headers,
HttpClientResult httpClientResult)
throws Exception {
// TODO: context is not automatically propagated into callbacks
Context context = Context.current();
// TODO(anuraaga): Do we also need to test ListenableFuture callback?
client.executeRequest(
request,
new AsyncCompletionHandler<Void>() {
@Override
public Void onCompleted(Response response) {
try (Scope ignored = context.makeCurrent()) {
httpClientResult.complete(response.getStatusCode());
}
return null;
}
@Override
public void onThrowable(Throwable throwable) {
try (Scope ignored = context.makeCurrent()) {
httpClientResult.complete(throwable);
}
}
});
}
@Override
protected void configure(HttpClientTestOptions.Builder optionsBuilder) {
optionsBuilder.disableTestRedirects();
optionsBuilder.disableTestHttps();
optionsBuilder.setUserAgent(USER_AGENT);
optionsBuilder.setExpectedClientSpanNameMapper(
(uri, method) -> {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
|| "http://192.0.2.1/".equals(uri.toString())) {
return "CONNECT";
}
return HttpClientTestOptions.DEFAULT_EXPECTED_CLIENT_SPAN_NAME_MAPPER.apply(uri, method);
});
optionsBuilder.setClientSpanErrorMapper(
(uri, error) -> {
if ("http://localhost:61/".equals(uri.toString())) { // unopened port
error =
error.getCause() != null
? error.getCause()
: new ConnectException("Connection refused: localhost/127.0.0.1:61");
} else if ("http://192.0.2.1/".equals(uri.toString())) { // non routable address
error = error.getCause() != null ? error.getCause() : new ClosedChannelException();
}
return error;
});
optionsBuilder.setHttpAttributes(
uri -> {
// unopened port or non routable address
if ("http://localhost:61/".equals(uri.toString())
|| "http://192.0.2.1/".equals(uri.toString())) {
return emptySet();
}
Set<AttributeKey<?>> attributes =
new HashSet<>(HttpClientTestOptions.DEFAULT_HTTP_ATTRIBUTES);
attributes.remove(SemanticAttributes.NET_PEER_NAME);
attributes.remove(SemanticAttributes.NET_PEER_PORT);
return attributes;
});
}
}

View File

@ -0,0 +1,211 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.netty.v3_8.server;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.NOT_FOUND;
import static io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint.forPath;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.LOCATION;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerTest;
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerTestOptions;
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.DefaultChannelPipeline;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.FailedChannelFuture;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.SucceededChannelFuture;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpServerCodec;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.logging.LoggingHandler;
import org.jboss.netty.logging.InternalLogLevel;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.jboss.netty.util.CharsetUtil;
import org.junit.jupiter.api.extension.RegisterExtension;
class Netty38ServerTest extends AbstractHttpServerTest<ServerBootstrap> {
@RegisterExtension
static final InstrumentationExtension testing = HttpServerInstrumentationExtension.forAgent();
static final LoggingHandler LOGGING_HANDLER;
static {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
LOGGING_HANDLER =
new LoggingHandler(Netty38ServerTest.class.getName(), InternalLogLevel.DEBUG, true);
}
@Override
protected ServerBootstrap setupServer() {
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory());
bootstrap.setParentHandler(LOGGING_HANDLER);
bootstrap.setPipelineFactory(Netty38ServerTest::channelPipeline);
InetSocketAddress address = new InetSocketAddress(port);
bootstrap.bind(address);
return bootstrap;
}
@Override
protected void stopServer(ServerBootstrap server) {
server.shutdown();
}
@Override
protected void configure(HttpServerTestOptions options) {
options.setHttpAttributes(
serverEndpoint -> {
Set<AttributeKey<?>> attributes =
new HashSet<>(HttpServerTestOptions.DEFAULT_HTTP_ATTRIBUTES);
attributes.remove(SemanticAttributes.HTTP_ROUTE);
return attributes;
});
options.setExpectedException(new IllegalArgumentException(ServerEndpoint.EXCEPTION.getBody()));
}
private static ChannelPipeline channelPipeline() {
ChannelPipeline channelPipeline = new DefaultChannelPipeline();
channelPipeline.addFirst("logger", LOGGING_HANDLER);
channelPipeline.addLast("http-codec", new HttpServerCodec());
channelPipeline.addLast(
"controller",
new SimpleChannelHandler() {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent msg) {
if (msg.getMessage() instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg.getMessage();
URI uri = URI.create(request.getUri());
ServerEndpoint endpoint = forPath(uri.getPath());
ctx.sendDownstream(
controller(
endpoint,
() -> {
HttpResponse response;
ChannelBuffer responseContent;
switch (endpoint) {
case SUCCESS:
case ERROR:
responseContent =
ChannelBuffers.copiedBuffer(endpoint.getBody(), CharsetUtil.UTF_8);
response =
new DefaultHttpResponse(
HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()));
response.setContent(responseContent);
break;
case INDEXED_CHILD:
responseContent = ChannelBuffers.EMPTY_BUFFER;
endpoint.collectSpanAttributes(
name ->
new QueryStringDecoder(uri)
.getParameters().get(name).stream()
.findFirst()
.orElse(null));
response =
new DefaultHttpResponse(
HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()));
response.setContent(responseContent);
break;
case QUERY_PARAM:
responseContent =
ChannelBuffers.copiedBuffer(uri.getQuery(), CharsetUtil.UTF_8);
response =
new DefaultHttpResponse(
HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()));
response.setContent(responseContent);
break;
case REDIRECT:
responseContent = ChannelBuffers.EMPTY_BUFFER;
response =
new DefaultHttpResponse(
HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()));
response.setContent(responseContent);
response.headers().set(LOCATION, endpoint.getBody());
break;
case CAPTURE_HEADERS:
responseContent =
ChannelBuffers.copiedBuffer(endpoint.getBody(), CharsetUtil.UTF_8);
response =
new DefaultHttpResponse(
HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()));
response
.headers()
.set("X-Test-Response", request.headers().get("X-Test-Request"));
response.setContent(responseContent);
break;
case EXCEPTION:
throw new IllegalArgumentException(endpoint.getBody());
default:
responseContent =
ChannelBuffers.copiedBuffer(NOT_FOUND.getBody(), CharsetUtil.UTF_8);
response =
new DefaultHttpResponse(
HTTP_1_1, HttpResponseStatus.valueOf(endpoint.getStatus()));
response.setContent(responseContent);
break;
}
response.headers().set(CONTENT_TYPE, "text/plain");
if (responseContent != null) {
response.headers().set(CONTENT_LENGTH, responseContent.readableBytes());
}
return new DownstreamMessageEvent(
ctx.getChannel(),
new SucceededChannelFuture(ctx.getChannel()),
response,
ctx.getChannel().getRemoteAddress());
}));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent ex) {
String message =
ex.getCause() == null
? "<no cause>"
: ex.getCause().getMessage() == null ? "<null>" : ex.getCause().getMessage();
ChannelBuffer buffer = ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8);
HttpResponse response =
new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.setContent(buffer);
response.headers().set(CONTENT_TYPE, "text/plain");
response.headers().set(CONTENT_LENGTH, buffer.readableBytes());
ctx.sendDownstream(
new DownstreamMessageEvent(
ctx.getChannel(),
new FailedChannelFuture(ctx.getChannel(), ex.getCause()),
response,
ctx.getChannel().getRemoteAddress()));
}
});
return channelPipeline;
}
}