diff --git a/services/src/main/java/io/grpc/services/BinlogHelper.java b/services/src/main/java/io/grpc/services/BinlogHelper.java index 907c488a24..566b63e94a 100644 --- a/services/src/main/java/io/grpc/services/BinlogHelper.java +++ b/services/src/main/java/io/grpc/services/BinlogHelper.java @@ -62,6 +62,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -76,8 +77,6 @@ final class BinlogHelper { private static final boolean SERVER = true; private static final boolean CLIENT = false; - @VisibleForTesting - static final CallId emptyCallId = new CallId(0, 0); @VisibleForTesting static final SocketAddress DUMMY_SOCKET = new DummySocketAddress(); @VisibleForTesting @@ -104,8 +103,9 @@ final class BinlogHelper { } @Override - void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId) { + void logSendInitialMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) { GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(Type.SEND_INITIAL_METADATA) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); @@ -115,8 +115,9 @@ final class BinlogHelper { @Override void logRecvInitialMetadata( - Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) { + int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) { GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(Type.RECV_INITIAL_METADATA) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)) @@ -126,8 +127,9 @@ final class BinlogHelper { } @Override - void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId) { + void logTrailingMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) { GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); @@ -137,6 +139,7 @@ final class BinlogHelper { @Override void logOutboundMessage( + int seq, Marshaller marshaller, T message, boolean compressed, @@ -146,6 +149,7 @@ final class BinlogHelper { throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); } GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(Type.SEND_MESSAGE) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); @@ -155,6 +159,7 @@ final class BinlogHelper { @Override void logInboundMessage( + int seq, Marshaller marshaller, T message, boolean compressed, @@ -164,6 +169,7 @@ final class BinlogHelper { throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller"); } GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() + .setSequenceIdWithinCall(seq) .setType(Type.RECV_MESSAGE) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); @@ -188,20 +194,21 @@ final class BinlogHelper { * Logs the sending of initial metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ - abstract void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId); + abstract void logSendInitialMetadata( + int seq, Metadata metadata, boolean isServer, CallId callId); /** * Logs the receiving of initial metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ abstract void logRecvInitialMetadata( - Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket); + int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket); /** * Logs the trailing metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ - abstract void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId); + abstract void logTrailingMetadata(int seq, Metadata metadata, boolean isServer, CallId callId); /** * Logs the outbound message. This method logs the appropriate number of bytes from @@ -210,7 +217,8 @@ final class BinlogHelper { * This method takes ownership of {@code message}. */ abstract void logOutboundMessage( - Marshaller marshaller, T message, boolean compressed, boolean isServer, CallId callId); + int seq, Marshaller marshaller, T message, boolean compressed, boolean isServer, + CallId callId); /** * Logs the inbound message. This method logs the appropriate number of bytes from @@ -219,7 +227,8 @@ final class BinlogHelper { * This method takes ownership of {@code message}. */ abstract void logInboundMessage( - Marshaller marshaller, T message, boolean compressed, boolean isServer, CallId callId); + int seq, Marshaller marshaller, T message, boolean compressed, boolean isServer, + CallId callId); /** * Returns the number bytes of the header this writer will log, according to configuration. @@ -245,15 +254,17 @@ final class BinlogHelper { @Override public ClientCall interceptCall( final MethodDescriptor method, CallOptions callOptions, Channel next) { + final AtomicInteger seq = new AtomicInteger(1); return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { - writer.logSendInitialMetadata(headers, CLIENT, callId); + writer.logSendInitialMetadata(seq.getAndIncrement(), headers, CLIENT, callId); ClientCall.Listener wListener = new SimpleForwardingClientCallListener(responseListener) { @Override public void onMessage(RespT message) { writer.logInboundMessage( + seq.getAndIncrement(), method.getResponseMarshaller(), message, DUMMY_IS_COMPRESSED, @@ -265,13 +276,14 @@ final class BinlogHelper { @Override public void onHeaders(Metadata headers) { SocketAddress peer = getPeerSocket(getAttributes()); - writer.logRecvInitialMetadata(headers, CLIENT, callId, peer); + writer.logRecvInitialMetadata( + seq.getAndIncrement(), headers, CLIENT, callId, peer); super.onHeaders(headers); } @Override public void onClose(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, CLIENT, callId); + writer.logTrailingMetadata(seq.getAndIncrement(), trailers, CLIENT, callId); super.onClose(status, trailers); } }; @@ -281,6 +293,7 @@ final class BinlogHelper { @Override public void sendMessage(ReqT message) { writer.logOutboundMessage( + seq.getAndIncrement(), method.getRequestMarshaller(), message, DUMMY_IS_COMPRESSED, @@ -300,12 +313,14 @@ final class BinlogHelper { final ServerCall call, Metadata headers, ServerCallHandler next) { + final AtomicInteger seq = new AtomicInteger(1); SocketAddress peer = getPeerSocket(call.getAttributes()); - writer.logRecvInitialMetadata(headers, SERVER, callId, peer); + writer.logRecvInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId, peer); ServerCall wCall = new SimpleForwardingServerCall(call) { @Override public void sendMessage(RespT message) { writer.logOutboundMessage( + seq.getAndIncrement(), call.getMethodDescriptor().getResponseMarshaller(), message, DUMMY_IS_COMPRESSED, @@ -316,13 +331,13 @@ final class BinlogHelper { @Override public void sendHeaders(Metadata headers) { - writer.logSendInitialMetadata(headers, SERVER, callId); + writer.logSendInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId); super.sendHeaders(headers); } @Override public void close(Status status, Metadata trailers) { - writer.logTrailingMetadata(trailers, SERVER, callId); + writer.logTrailingMetadata(seq.getAndIncrement(), trailers, SERVER, callId); super.close(status, trailers); } }; @@ -331,6 +346,7 @@ final class BinlogHelper { @Override public void onMessage(ReqT message) { writer.logInboundMessage( + seq.getAndIncrement(), call.getMethodDescriptor().getRequestMarshaller(), message, DUMMY_IS_COMPRESSED, diff --git a/services/src/test/java/io/grpc/services/BinlogHelperTest.java b/services/src/test/java/io/grpc/services/BinlogHelperTest.java index 9c7417b70c..7ad9fa66b4 100644 --- a/services/src/test/java/io/grpc/services/BinlogHelperTest.java +++ b/services/src/test/java/io/grpc/services/BinlogHelperTest.java @@ -535,9 +535,10 @@ public final class BinlogHelperTest { @Test public void logSendInitialMetadata_server() throws Exception { - sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); + sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -546,9 +547,10 @@ public final class BinlogHelperTest { @Test public void logSendInitialMetadata_client() throws Exception { - sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); + sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -560,9 +562,11 @@ public final class BinlogHelperTest { InetAddress address = InetAddress.getByName("127.0.0.1"); int port = 12345; InetSocketAddress socketAddress = new InetSocketAddress(address, port); - sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress); + sinkWriterImpl.logRecvInitialMetadata( + /*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -575,9 +579,11 @@ public final class BinlogHelperTest { InetAddress address = InetAddress.getByName("127.0.0.1"); int port = 12345; InetSocketAddress socketAddress = new InetSocketAddress(address, port); - sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress); + sinkWriterImpl.logRecvInitialMetadata( + /*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -587,9 +593,10 @@ public final class BinlogHelperTest { @Test public void logTrailingMetadata_server() throws Exception { - sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID); + sinkWriterImpl.logTrailingMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_TRAILING_METADATA) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -598,9 +605,10 @@ public final class BinlogHelperTest { @Test public void logTrailingMetadata_client() throws Exception { - sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID); + sinkWriterImpl.logTrailingMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_TRAILING_METADATA) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -610,18 +618,20 @@ public final class BinlogHelperTest { @Test public void logOutboundMessage_server() throws Exception { sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_MESSAGE) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) .build()); sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_MESSAGE) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -632,18 +642,20 @@ public final class BinlogHelperTest { @Test public void logOutboundMessage_client() throws Exception { sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_MESSAGE) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) .build()); sinkWriterImpl.logOutboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.SEND_MESSAGE) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -654,18 +666,20 @@ public final class BinlogHelperTest { @Test public void logInboundMessage_server() throws Exception { sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_MESSAGE) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) .build()); sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_MESSAGE) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -676,18 +690,20 @@ public final class BinlogHelperTest { @Test public void logInboundMessage_client() throws Exception { sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_MESSAGE) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) .build()); sinkWriterImpl.logInboundMessage( - BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); + /*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID); verify(sink).write( messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder() + .setSequenceIdWithinCall(1) .setType(GrpcLogEntry.Type.RECV_MESSAGE) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -764,6 +780,7 @@ public final class BinlogHelperTest { Metadata clientInitial = new Metadata(); interceptedCall.start(mockListener, clientInitial); verify(mockSinkWriter).logSendInitialMetadata( + /*seq=*/ eq(1), same(clientInitial), eq(IS_CLIENT), same(CALL_ID)); @@ -775,7 +792,9 @@ public final class BinlogHelperTest { { Metadata serverInitial = new Metadata(); interceptedListener.get().onHeaders(serverInitial); - verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial), + verify(mockSinkWriter).logRecvInitialMetadata( + /*seq=*/ eq(2), + same(serverInitial), eq(IS_CLIENT), same(CALL_ID), same(peer)); @@ -788,6 +807,7 @@ public final class BinlogHelperTest { byte[] request = "this is a request".getBytes(US_ASCII); interceptedCall.sendMessage(request); verify(mockSinkWriter).logOutboundMessage( + /*seq=*/ eq(3), same(BYTEARRAY_MARSHALLER), same(request), eq(BinlogHelper.DUMMY_IS_COMPRESSED), @@ -802,6 +822,7 @@ public final class BinlogHelperTest { byte[] response = "this is a response".getBytes(US_ASCII); interceptedListener.get().onMessage(response); verify(mockSinkWriter).logInboundMessage( + /*seq=*/ eq(4), same(BYTEARRAY_MARSHALLER), eq(response), eq(BinlogHelper.DUMMY_IS_COMPRESSED), @@ -818,6 +839,7 @@ public final class BinlogHelperTest { interceptedListener.get().onClose(status, trailers); verify(mockSinkWriter).logTrailingMetadata( + /*seq=*/ eq(5), same(trailers), eq(IS_CLIENT), same(CALL_ID)); @@ -894,6 +916,7 @@ public final class BinlogHelperTest { } }); verify(mockSinkWriter).logRecvInitialMetadata( + /*seq=*/ eq(1), same(clientInitial), eq(IS_SERVER), same(CALL_ID), @@ -906,6 +929,7 @@ public final class BinlogHelperTest { Metadata serverInital = new Metadata(); interceptedCall.get().sendHeaders(serverInital); verify(mockSinkWriter).logSendInitialMetadata( + /*seq=*/ eq(2), same(serverInital), eq(IS_SERVER), same(CALL_ID)); @@ -918,6 +942,7 @@ public final class BinlogHelperTest { byte[] request = "this is a request".getBytes(US_ASCII); capturedListener.onMessage(request); verify(mockSinkWriter).logInboundMessage( + /*seq=*/ eq(3), same(BYTEARRAY_MARSHALLER), same(request), eq(BinlogHelper.DUMMY_IS_COMPRESSED), @@ -932,6 +957,7 @@ public final class BinlogHelperTest { byte[] response = "this is a response".getBytes(US_ASCII); interceptedCall.get().sendMessage(response); verify(mockSinkWriter).logOutboundMessage( + /*seq=*/ eq(4), same(BYTEARRAY_MARSHALLER), same(response), eq(BinlogHelper.DUMMY_IS_COMPRESSED), @@ -947,6 +973,7 @@ public final class BinlogHelperTest { Metadata trailers = new Metadata(); interceptedCall.get().close(status, trailers); verify(mockSinkWriter).logTrailingMetadata( + /*seq=*/ eq(5), same(trailers), eq(IS_SERVER), same(CALL_ID));