services: add sequence id within call for entries (#4549)

This is a new field added to the binlog proto so that we can detect when
the storage impl reorders or drops recorded entries.
This commit is contained in:
zpencer 2018-06-07 12:19:36 -07:00 committed by GitHub
parent bc9d3ab7ca
commit c05d0f40ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 74 additions and 31 deletions

View File

@ -62,6 +62,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@ -76,8 +77,6 @@ final class BinlogHelper {
private static final boolean SERVER = true;
private static final boolean CLIENT = false;
@VisibleForTesting
static final CallId emptyCallId = new CallId(0, 0);
@VisibleForTesting
static final SocketAddress DUMMY_SOCKET = new DummySocketAddress();
@VisibleForTesting
@ -104,8 +103,9 @@ final class BinlogHelper {
}
@Override
void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId) {
void logSendInitialMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) {
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
.setSequenceIdWithinCall(seq)
.setType(Type.SEND_INITIAL_METADATA)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
@ -115,8 +115,9 @@ final class BinlogHelper {
@Override
void logRecvInitialMetadata(
Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) {
int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) {
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
.setSequenceIdWithinCall(seq)
.setType(Type.RECV_INITIAL_METADATA)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId))
@ -126,8 +127,9 @@ final class BinlogHelper {
}
@Override
void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId) {
void logTrailingMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) {
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
.setSequenceIdWithinCall(seq)
.setType(isServer ? Type.SEND_TRAILING_METADATA : Type.RECV_TRAILING_METADATA)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
@ -137,6 +139,7 @@ final class BinlogHelper {
@Override
<T> void logOutboundMessage(
int seq,
Marshaller<T> marshaller,
T message,
boolean compressed,
@ -146,6 +149,7 @@ final class BinlogHelper {
throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller");
}
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
.setSequenceIdWithinCall(seq)
.setType(Type.SEND_MESSAGE)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
@ -155,6 +159,7 @@ final class BinlogHelper {
@Override
<T> void logInboundMessage(
int seq,
Marshaller<T> marshaller,
T message,
boolean compressed,
@ -164,6 +169,7 @@ final class BinlogHelper {
throw new IllegalStateException("Expected the BinaryLog's ByteArrayMarshaller");
}
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
.setSequenceIdWithinCall(seq)
.setType(Type.RECV_MESSAGE)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
@ -188,20 +194,21 @@ final class BinlogHelper {
* Logs the sending of initial metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration.
*/
abstract void logSendInitialMetadata(Metadata metadata, boolean isServer, CallId callId);
abstract void logSendInitialMetadata(
int seq, Metadata metadata, boolean isServer, CallId callId);
/**
* Logs the receiving of initial metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration.
*/
abstract void logRecvInitialMetadata(
Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket);
int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket);
/**
* Logs the trailing metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration.
*/
abstract void logTrailingMetadata(Metadata metadata, boolean isServer, CallId callId);
abstract void logTrailingMetadata(int seq, Metadata metadata, boolean isServer, CallId callId);
/**
* Logs the outbound message. This method logs the appropriate number of bytes from
@ -210,7 +217,8 @@ final class BinlogHelper {
* This method takes ownership of {@code message}.
*/
abstract <T> void logOutboundMessage(
Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, CallId callId);
int seq, Marshaller<T> marshaller, T message, boolean compressed, boolean isServer,
CallId callId);
/**
* Logs the inbound message. This method logs the appropriate number of bytes from
@ -219,7 +227,8 @@ final class BinlogHelper {
* This method takes ownership of {@code message}.
*/
abstract <T> void logInboundMessage(
Marshaller<T> marshaller, T message, boolean compressed, boolean isServer, CallId callId);
int seq, Marshaller<T> marshaller, T message, boolean compressed, boolean isServer,
CallId callId);
/**
* Returns the number bytes of the header this writer will log, according to configuration.
@ -245,15 +254,17 @@ final class BinlogHelper {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final AtomicInteger seq = new AtomicInteger(1);
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
writer.logSendInitialMetadata(headers, CLIENT, callId);
writer.logSendInitialMetadata(seq.getAndIncrement(), headers, CLIENT, callId);
ClientCall.Listener<RespT> wListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onMessage(RespT message) {
writer.logInboundMessage(
seq.getAndIncrement(),
method.getResponseMarshaller(),
message,
DUMMY_IS_COMPRESSED,
@ -265,13 +276,14 @@ final class BinlogHelper {
@Override
public void onHeaders(Metadata headers) {
SocketAddress peer = getPeerSocket(getAttributes());
writer.logRecvInitialMetadata(headers, CLIENT, callId, peer);
writer.logRecvInitialMetadata(
seq.getAndIncrement(), headers, CLIENT, callId, peer);
super.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
writer.logTrailingMetadata(trailers, CLIENT, callId);
writer.logTrailingMetadata(seq.getAndIncrement(), trailers, CLIENT, callId);
super.onClose(status, trailers);
}
};
@ -281,6 +293,7 @@ final class BinlogHelper {
@Override
public void sendMessage(ReqT message) {
writer.logOutboundMessage(
seq.getAndIncrement(),
method.getRequestMarshaller(),
message,
DUMMY_IS_COMPRESSED,
@ -300,12 +313,14 @@ final class BinlogHelper {
final ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
final AtomicInteger seq = new AtomicInteger(1);
SocketAddress peer = getPeerSocket(call.getAttributes());
writer.logRecvInitialMetadata(headers, SERVER, callId, peer);
writer.logRecvInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId, peer);
ServerCall<ReqT, RespT> wCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
writer.logOutboundMessage(
seq.getAndIncrement(),
call.getMethodDescriptor().getResponseMarshaller(),
message,
DUMMY_IS_COMPRESSED,
@ -316,13 +331,13 @@ final class BinlogHelper {
@Override
public void sendHeaders(Metadata headers) {
writer.logSendInitialMetadata(headers, SERVER, callId);
writer.logSendInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId);
super.sendHeaders(headers);
}
@Override
public void close(Status status, Metadata trailers) {
writer.logTrailingMetadata(trailers, SERVER, callId);
writer.logTrailingMetadata(seq.getAndIncrement(), trailers, SERVER, callId);
super.close(status, trailers);
}
};
@ -331,6 +346,7 @@ final class BinlogHelper {
@Override
public void onMessage(ReqT message) {
writer.logInboundMessage(
seq.getAndIncrement(),
call.getMethodDescriptor().getRequestMarshaller(),
message,
DUMMY_IS_COMPRESSED,

View File

@ -535,9 +535,10 @@ public final class BinlogHelperTest {
@Test
public void logSendInitialMetadata_server() throws Exception {
sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID);
sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -546,9 +547,10 @@ public final class BinlogHelperTest {
@Test
public void logSendInitialMetadata_client() throws Exception {
sinkWriterImpl.logSendInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID);
sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -560,9 +562,11 @@ public final class BinlogHelperTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress);
sinkWriterImpl.logRecvInitialMetadata(
/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -575,9 +579,11 @@ public final class BinlogHelperTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
sinkWriterImpl.logRecvInitialMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress);
sinkWriterImpl.logRecvInitialMetadata(
/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -587,9 +593,10 @@ public final class BinlogHelperTest {
@Test
public void logTrailingMetadata_server() throws Exception {
sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_SERVER, CALL_ID);
sinkWriterImpl.logTrailingMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_TRAILING_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -598,9 +605,10 @@ public final class BinlogHelperTest {
@Test
public void logTrailingMetadata_client() throws Exception {
sinkWriterImpl.logTrailingMetadata(nonEmptyMetadata, IS_CLIENT, CALL_ID);
sinkWriterImpl.logTrailingMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_TRAILING_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -610,18 +618,20 @@ public final class BinlogHelperTest {
@Test
public void logOutboundMessage_server() throws Exception {
sinkWriterImpl.logOutboundMessage(
BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID);
/*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_MESSAGE)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logOutboundMessage(
BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
/*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_MESSAGE)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -632,18 +642,20 @@ public final class BinlogHelperTest {
@Test
public void logOutboundMessage_client() throws Exception {
sinkWriterImpl.logOutboundMessage(
BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
/*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_MESSAGE)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logOutboundMessage(
BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
/*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.SEND_MESSAGE)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -654,18 +666,20 @@ public final class BinlogHelperTest {
@Test
public void logInboundMessage_server() throws Exception {
sinkWriterImpl.logInboundMessage(
BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID);
/*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_SERVER, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_MESSAGE)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logInboundMessage(
BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
/*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_SERVER, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_MESSAGE)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -676,18 +690,20 @@ public final class BinlogHelperTest {
@Test
public void logInboundMessage_client() throws Exception {
sinkWriterImpl.logInboundMessage(
BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
/*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_COMPRESSED, IS_CLIENT, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_COMPRESSED, MESSAGE_LIMIT).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_MESSAGE)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logInboundMessage(
BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
/*seq=*/ 1, BYTEARRAY_MARSHALLER, message, IS_UNCOMPRESSED, IS_CLIENT, CALL_ID);
verify(sink).write(
messageToProtoTestHelper(message, IS_UNCOMPRESSED, MESSAGE_LIMIT).toBuilder()
.setSequenceIdWithinCall(1)
.setType(GrpcLogEntry.Type.RECV_MESSAGE)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -764,6 +780,7 @@ public final class BinlogHelperTest {
Metadata clientInitial = new Metadata();
interceptedCall.start(mockListener, clientInitial);
verify(mockSinkWriter).logSendInitialMetadata(
/*seq=*/ eq(1),
same(clientInitial),
eq(IS_CLIENT),
same(CALL_ID));
@ -775,7 +792,9 @@ public final class BinlogHelperTest {
{
Metadata serverInitial = new Metadata();
interceptedListener.get().onHeaders(serverInitial);
verify(mockSinkWriter).logRecvInitialMetadata(same(serverInitial),
verify(mockSinkWriter).logRecvInitialMetadata(
/*seq=*/ eq(2),
same(serverInitial),
eq(IS_CLIENT),
same(CALL_ID),
same(peer));
@ -788,6 +807,7 @@ public final class BinlogHelperTest {
byte[] request = "this is a request".getBytes(US_ASCII);
interceptedCall.sendMessage(request);
verify(mockSinkWriter).logOutboundMessage(
/*seq=*/ eq(3),
same(BYTEARRAY_MARSHALLER),
same(request),
eq(BinlogHelper.DUMMY_IS_COMPRESSED),
@ -802,6 +822,7 @@ public final class BinlogHelperTest {
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedListener.get().onMessage(response);
verify(mockSinkWriter).logInboundMessage(
/*seq=*/ eq(4),
same(BYTEARRAY_MARSHALLER),
eq(response),
eq(BinlogHelper.DUMMY_IS_COMPRESSED),
@ -818,6 +839,7 @@ public final class BinlogHelperTest {
interceptedListener.get().onClose(status, trailers);
verify(mockSinkWriter).logTrailingMetadata(
/*seq=*/ eq(5),
same(trailers),
eq(IS_CLIENT),
same(CALL_ID));
@ -894,6 +916,7 @@ public final class BinlogHelperTest {
}
});
verify(mockSinkWriter).logRecvInitialMetadata(
/*seq=*/ eq(1),
same(clientInitial),
eq(IS_SERVER),
same(CALL_ID),
@ -906,6 +929,7 @@ public final class BinlogHelperTest {
Metadata serverInital = new Metadata();
interceptedCall.get().sendHeaders(serverInital);
verify(mockSinkWriter).logSendInitialMetadata(
/*seq=*/ eq(2),
same(serverInital),
eq(IS_SERVER),
same(CALL_ID));
@ -918,6 +942,7 @@ public final class BinlogHelperTest {
byte[] request = "this is a request".getBytes(US_ASCII);
capturedListener.onMessage(request);
verify(mockSinkWriter).logInboundMessage(
/*seq=*/ eq(3),
same(BYTEARRAY_MARSHALLER),
same(request),
eq(BinlogHelper.DUMMY_IS_COMPRESSED),
@ -932,6 +957,7 @@ public final class BinlogHelperTest {
byte[] response = "this is a response".getBytes(US_ASCII);
interceptedCall.get().sendMessage(response);
verify(mockSinkWriter).logOutboundMessage(
/*seq=*/ eq(4),
same(BYTEARRAY_MARSHALLER),
same(response),
eq(BinlogHelper.DUMMY_IS_COMPRESSED),
@ -947,6 +973,7 @@ public final class BinlogHelperTest {
Metadata trailers = new Metadata();
interceptedCall.get().close(status, trailers);
verify(mockSinkWriter).logTrailingMetadata(
/*seq=*/ eq(5),
same(trailers),
eq(IS_SERVER),
same(CALL_ID));