Implement HttpServerResponseCustomizer support for Akka-http (#8273)
This commit is contained in:
parent
2dfe64a08b
commit
56eae8109b
|
@ -9,6 +9,7 @@ import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentCo
|
||||||
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse;
|
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.errorResponse;
|
||||||
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter;
|
import static io.opentelemetry.javaagent.instrumentation.akkahttp.server.AkkaHttpServerSingletons.instrumenter;
|
||||||
|
|
||||||
|
import akka.http.javadsl.model.HttpHeader;
|
||||||
import akka.http.scaladsl.model.HttpRequest;
|
import akka.http.scaladsl.model.HttpRequest;
|
||||||
import akka.http.scaladsl.model.HttpResponse;
|
import akka.http.scaladsl.model.HttpResponse;
|
||||||
import akka.stream.Attributes;
|
import akka.stream.Attributes;
|
||||||
|
@ -22,8 +23,10 @@ import akka.stream.stage.GraphStage;
|
||||||
import akka.stream.stage.GraphStageLogic;
|
import akka.stream.stage.GraphStageLogic;
|
||||||
import io.opentelemetry.context.Context;
|
import io.opentelemetry.context.Context;
|
||||||
import io.opentelemetry.context.Scope;
|
import io.opentelemetry.context.Scope;
|
||||||
|
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseCustomizerHolder;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Deque;
|
import java.util.Deque;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class AkkaFlowWrapper
|
public class AkkaFlowWrapper
|
||||||
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
|
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
|
||||||
|
@ -134,6 +137,17 @@ public class AkkaFlowWrapper
|
||||||
// this may happen on a different thread from the one that opened the scope
|
// this may happen on a different thread from the one that opened the scope
|
||||||
// actor instrumentation will take care of the leaked scopes
|
// actor instrumentation will take care of the leaked scopes
|
||||||
tracingRequest.scope.close();
|
tracingRequest.scope.close();
|
||||||
|
|
||||||
|
// akka response is immutable so the customizer just captures the added headers
|
||||||
|
AkkaHttpResponseMutator responseMutator = new AkkaHttpResponseMutator();
|
||||||
|
HttpServerResponseCustomizerHolder.getCustomizer()
|
||||||
|
.customize(tracingRequest.context, response, responseMutator);
|
||||||
|
// build a new response with the added headers
|
||||||
|
List<HttpHeader> headers = responseMutator.getHeaders();
|
||||||
|
if (!headers.isEmpty()) {
|
||||||
|
response = (HttpResponse) response.addHeaders(headers);
|
||||||
|
}
|
||||||
|
|
||||||
instrumenter().end(tracingRequest.context, tracingRequest.request, response, null);
|
instrumenter().end(tracingRequest.context, tracingRequest.request, response, null);
|
||||||
}
|
}
|
||||||
push(responseOut, response);
|
push(responseOut, response);
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*
|
||||||
|
* Copyright The OpenTelemetry Authors
|
||||||
|
* SPDX-License-Identifier: Apache-2.0
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.opentelemetry.javaagent.instrumentation.akkahttp.server;
|
||||||
|
|
||||||
|
import akka.http.javadsl.model.HttpHeader;
|
||||||
|
import akka.http.javadsl.model.HttpResponse;
|
||||||
|
import akka.http.javadsl.model.headers.RawHeader;
|
||||||
|
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseMutator;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
final class AkkaHttpResponseMutator implements HttpServerResponseMutator<HttpResponse> {
|
||||||
|
|
||||||
|
private final List<HttpHeader> headers = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void appendHeader(HttpResponse response, String name, String value) {
|
||||||
|
headers.add(RawHeader.create(name, value));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<HttpHeader> getHeaders() {
|
||||||
|
return headers;
|
||||||
|
}
|
||||||
|
}
|
|
@ -14,7 +14,7 @@ import io.opentelemetry.instrumentation.testing.junit.http.{
|
||||||
|
|
||||||
import java.util
|
import java.util
|
||||||
import java.util.Collections
|
import java.util.Collections
|
||||||
import java.util.function.Function
|
import java.util.function.{Function, Predicate}
|
||||||
|
|
||||||
abstract class AbstractHttpServerInstrumentationTest
|
abstract class AbstractHttpServerInstrumentationTest
|
||||||
extends AbstractHttpServerTest[Object] {
|
extends AbstractHttpServerTest[Object] {
|
||||||
|
@ -29,5 +29,11 @@ abstract class AbstractHttpServerInstrumentationTest
|
||||||
Collections.emptySet()
|
Collections.emptySet()
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
options.setHasResponseCustomizer(
|
||||||
|
new Predicate[ServerEndpoint] {
|
||||||
|
override def test(t: ServerEndpoint): Boolean =
|
||||||
|
t != ServerEndpoint.EXCEPTION
|
||||||
|
}
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue