api: Add ClientStreamTracer.inboundHeaders(Metadata)

This will be used by the metadata exchange of CSM. When recording
per-attempt metrics, we really need per-attempt data and can't leverage
ClientInterceptors.
This commit is contained in:
Eric Anderson 2024-05-13 16:04:26 -07:00
parent fea577c804
commit 960012d76e
7 changed files with 31 additions and 8 deletions

View File

@ -70,10 +70,23 @@ public abstract class ClientStreamTracer extends StreamTracer {
}
/**
* Trailing metadata has been received from the server.
* Headers has been received from the server. This method does not pass ownership to {@code
* headers}, so implementations must not access the metadata after returning. Modifications to the
* metadata within this method will be seen by interceptors and the application.
*
* @param trailers the mutable trailing metadata. Modifications to it will be seen by
* interceptors and the application.
* @param headers the received header metadata
*/
public void inboundHeaders(Metadata headers) {
inboundHeaders();
}
/**
* Trailing metadata has been received from the server. This method does not pass ownership to
* {@code trailers}, so implementations must not access the metadata after returning.
* Modifications to the metadata within this method will be seen by interceptors and the
* application.
*
* @param trailers the received trailing metadata
* @since 1.17.0
*/
public void inboundTrailers(Metadata trailers) {

View File

@ -579,7 +579,7 @@ abstract class Inbound<L extends StreamListener> implements StreamListener.Messa
@GuardedBy("this")
protected void handlePrefix(int flags, Parcel parcel) throws StatusException {
Metadata headers = MetadataHelper.readMetadata(parcel, attributes);
statsTraceContext.clientInboundHeaders();
statsTraceContext.clientInboundHeaders(headers);
listener.headersRead(headers);
}

View File

@ -304,7 +304,7 @@ public abstract class AbstractClientStream extends AbstractStream
*/
protected void inboundHeadersReceived(Metadata headers) {
checkState(!statusReported, "Received headers on closed stream");
statsTraceCtx.clientInboundHeaders();
statsTraceCtx.clientInboundHeaders(headers);
boolean compressedStream = false;
String streamEncoding = headers.get(CONTENT_ENCODING_KEY);

View File

@ -49,6 +49,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
delegate().inboundHeaders();
}
@Override
public void inboundHeaders(Metadata headers) {
delegate().inboundHeaders(headers);
}
@Override
public void inboundTrailers(Metadata trailers) {
delegate().inboundTrailers(trailers);

View File

@ -101,9 +101,9 @@ public final class StatsTraceContext {
*
* <p>Called from abstract stream implementations.
*/
public void clientInboundHeaders() {
public void clientInboundHeaders(Metadata headers) {
for (StreamTracer tracer : tracers) {
((ClientStreamTracer) tracer).inboundHeaders();
((ClientStreamTracer) tracer).inboundHeaders(headers);
}
}

View File

@ -571,7 +571,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
return;
}
clientStream.statsTraceCtx.clientInboundHeaders();
clientStream.statsTraceCtx.clientInboundHeaders(headers);
syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
}
syncContext.drain();

View File

@ -48,6 +48,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
delegate().inboundHeaders();
}
@Override
public void inboundHeaders(Metadata headers) {
delegate().inboundHeaders(headers);
}
@Override
public void inboundTrailers(Metadata trailers) {
delegate().inboundTrailers(trailers);