diff --git a/services/src/main/java/io/grpc/services/BinlogHelper.java b/services/src/main/java/io/grpc/services/BinlogHelper.java index 566b63e94a..f731ea3cd9 100644 --- a/services/src/main/java/io/grpc/services/BinlogHelper.java +++ b/services/src/main/java/io/grpc/services/BinlogHelper.java @@ -21,8 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.services.BinaryLogProvider.BYTEARRAY_MARSHALLER; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.protobuf.ByteString; +import com.google.protobuf.Duration; +import com.google.protobuf.util.Durations; import com.google.re2j.Matcher; import com.google.re2j.Pattern; import io.grpc.Attributes; @@ -31,6 +34,8 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; @@ -52,6 +57,7 @@ import io.grpc.binarylog.Metadata.Builder; import io.grpc.binarylog.Peer; import io.grpc.binarylog.Peer.PeerType; import io.grpc.binarylog.Uint128; +import io.grpc.internal.GrpcUtil; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; @@ -62,6 +68,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -103,19 +110,41 @@ final class BinlogHelper { } @Override - void logSendInitialMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) { + void logSendInitialMetadata( + int seq, + @Nullable String methodName, // null on server + @Nullable Duration timeout, // null on server + Metadata metadata, + boolean isServer, + CallId callId) { + Preconditions.checkArgument(methodName == null || !isServer); + Preconditions.checkArgument(timeout == null || !isServer); GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() .setSequenceIdWithinCall(seq) .setType(Type.SEND_INITIAL_METADATA) .setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT) .setCallId(callIdToProto(callId)); addMetadataToProto(entryBuilder, metadata, maxHeaderBytes); + if (methodName != null) { + entryBuilder.setMethodName(methodName); + } + if (timeout != null) { + entryBuilder.setTimeout(timeout); + } sink.write(entryBuilder.build()); } @Override void logRecvInitialMetadata( - int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) { + int seq, + @Nullable String methodName, // null on client + @Nullable Duration timeout, // null on client + Metadata metadata, + boolean isServer, + CallId callId, + SocketAddress peerSocket) { + Preconditions.checkArgument(methodName == null || isServer); + Preconditions.checkArgument(timeout == null || isServer); GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder() .setSequenceIdWithinCall(seq) .setType(Type.RECV_INITIAL_METADATA) @@ -123,6 +152,12 @@ final class BinlogHelper { .setCallId(callIdToProto(callId)) .setPeer(socketToProto(peerSocket)); addMetadataToProto(entryBuilder, metadata, maxHeaderBytes); + if (methodName != null) { + entryBuilder.setMethodName(methodName); + } + if (timeout != null) { + entryBuilder.setTimeout(timeout); + } sink.write(entryBuilder.build()); } @@ -195,14 +230,25 @@ final class BinlogHelper { * as determined by the binary logging configuration. */ abstract void logSendInitialMetadata( - int seq, Metadata metadata, boolean isServer, CallId callId); + int seq, + String methodName, + Duration timeout, + Metadata metadata, + boolean isServer, + CallId callId); /** * Logs the receiving of initial metadata. This method logs the appropriate number of bytes * as determined by the binary logging configuration. */ abstract void logRecvInitialMetadata( - int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket); + int seq, + String methodName, + Duration timeout, + Metadata metadata, + boolean isServer, + CallId callId, + SocketAddress peerSocket); /** * Logs the trailing metadata. This method logs the appropriate number of bytes @@ -249,16 +295,34 @@ final class BinlogHelper { return peer; } + private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) { + if (deadline0 == null) { + return deadline1; + } + if (deadline1 == null) { + return deadline0; + } + return deadline0.minimum(deadline1); + } + public ClientInterceptor getClientInterceptor(final CallId callId) { return new ClientInterceptor() { @Override public ClientCall interceptCall( final MethodDescriptor method, CallOptions callOptions, Channel next) { final AtomicInteger seq = new AtomicInteger(1); + final String methodName = method.getFullMethodName(); + // The timeout should reflect the time remaining when the call is started, so do not + // compute remaining time here. + final Deadline deadline = min(callOptions.getDeadline(), Context.current().getDeadline()); + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { - writer.logSendInitialMetadata(seq.getAndIncrement(), headers, CLIENT, callId); + final Duration timeout = deadline == null ? null + : Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS)); + writer.logSendInitialMetadata( + seq.getAndIncrement(), methodName, timeout, headers, CLIENT, callId); ClientCall.Listener wListener = new SimpleForwardingClientCallListener(responseListener) { @Override @@ -277,7 +341,13 @@ final class BinlogHelper { public void onHeaders(Metadata headers) { SocketAddress peer = getPeerSocket(getAttributes()); writer.logRecvInitialMetadata( - seq.getAndIncrement(), headers, CLIENT, callId, peer); + seq.getAndIncrement(), + /*methodName=*/ null, + /*timeout=*/ null, + headers, + CLIENT, + callId, + peer); super.onHeaders(headers); } @@ -315,7 +385,13 @@ final class BinlogHelper { ServerCallHandler next) { final AtomicInteger seq = new AtomicInteger(1); SocketAddress peer = getPeerSocket(call.getAttributes()); - writer.logRecvInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId, peer); + String methodName = call.getMethodDescriptor().getFullMethodName(); + Long timeoutNanos = headers.get(GrpcUtil.TIMEOUT_KEY); + final Duration timeout = + timeoutNanos == null ? null : Durations.fromNanos(timeoutNanos); + + writer.logRecvInitialMetadata( + seq.getAndIncrement(), methodName, timeout, headers, SERVER, callId, peer); ServerCall wCall = new SimpleForwardingServerCall(call) { @Override public void sendMessage(RespT message) { @@ -331,7 +407,13 @@ final class BinlogHelper { @Override public void sendHeaders(Metadata headers) { - writer.logSendInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId); + writer.logSendInitialMetadata( + seq.getAndIncrement(), + /*methodName=*/ null, + /*timeout=*/ null, + headers, + SERVER, + callId); super.sendHeaders(headers); } diff --git a/services/src/test/java/io/grpc/services/BinlogHelperTest.java b/services/src/test/java/io/grpc/services/BinlogHelperTest.java index 7ad9fa66b4..cd736a9903 100644 --- a/services/src/test/java/io/grpc/services/BinlogHelperTest.java +++ b/services/src/test/java/io/grpc/services/BinlogHelperTest.java @@ -16,6 +16,7 @@ package io.grpc.services; +import static com.google.common.truth.Truth.assertThat; import static io.grpc.services.BinaryLogProvider.BYTEARRAY_MARSHALLER; import static io.grpc.services.BinlogHelper.DUMMY_SOCKET; import static io.grpc.services.BinlogHelper.getPeerSocket; @@ -23,18 +24,25 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.same; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; +import com.google.protobuf.Duration; +import com.google.protobuf.util.Durations; import io.grpc.Attributes; import io.grpc.BinaryLog.CallId; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; +import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.Grpc; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -48,6 +56,7 @@ import io.grpc.binarylog.MetadataEntry; import io.grpc.binarylog.Peer; import io.grpc.binarylog.Peer.PeerType; import io.grpc.binarylog.Uint128; +import io.grpc.internal.GrpcUtil; import io.grpc.internal.NoopClientCall; import io.grpc.internal.NoopServerCall; import io.grpc.services.BinlogHelper.FactoryImpl; @@ -58,11 +67,14 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; /** Tests for {@link BinlogHelper}. */ @RunWith(JUnit4.class) @@ -535,7 +547,13 @@ public final class BinlogHelperTest { @Test public void logSendInitialMetadata_server() throws Exception { - sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID); + sinkWriterImpl.logSendInitialMetadata( + /*seq=*/ 1, + /*methodNmae=*/ null, + /*timeout=*/ null, + nonEmptyMetadata, + IS_SERVER, + CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() .setSequenceIdWithinCall(1) @@ -547,10 +565,34 @@ public final class BinlogHelperTest { @Test public void logSendInitialMetadata_client() throws Exception { - sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID); + sinkWriterImpl.logSendInitialMetadata( + /*seq=*/ 1, + "service/method", + Durations.fromMillis(1234), + nonEmptyMetadata, + IS_CLIENT, + CALL_ID); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() .setSequenceIdWithinCall(1) + .setMethodName("service/method") + .setTimeout(Durations.fromMillis(1234)) + .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.CLIENT) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .build()); + + sinkWriterImpl.logSendInitialMetadata( + /*seq=*/ 1, + "service/method", + /*timeout=*/ null, + nonEmptyMetadata, + IS_CLIENT, + CALL_ID); + verify(sink).write( + metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) + .setMethodName("service/method") .setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.CLIENT) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -563,10 +605,36 @@ public final class BinlogHelperTest { int port = 12345; InetSocketAddress socketAddress = new InetSocketAddress(address, port); sinkWriterImpl.logRecvInitialMetadata( - /*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress); + /*seq=*/ 1, + "service/method", + Durations.fromMillis(1234), + nonEmptyMetadata, + IS_SERVER, + CALL_ID, + socketAddress); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() .setSequenceIdWithinCall(1) + .setMethodName("service/method") + .setTimeout(Durations.fromMillis(1234)) + .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) + .setLogger(GrpcLogEntry.Logger.SERVER) + .setCallId(BinlogHelper.callIdToProto(CALL_ID)) + .setPeer(BinlogHelper.socketToProto(socketAddress)) + .build()); + + sinkWriterImpl.logRecvInitialMetadata( + /*seq=*/ 1, + "service/method", + /*timeout=*/ null, + nonEmptyMetadata, + IS_SERVER, + CALL_ID, + socketAddress); + verify(sink).write( + metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() + .setSequenceIdWithinCall(1) + .setMethodName("service/method") .setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA) .setLogger(GrpcLogEntry.Logger.SERVER) .setCallId(BinlogHelper.callIdToProto(CALL_ID)) @@ -580,7 +648,13 @@ public final class BinlogHelperTest { int port = 12345; InetSocketAddress socketAddress = new InetSocketAddress(address, port); sinkWriterImpl.logRecvInitialMetadata( - /*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress); + /*seq=*/ 1, + /*methodNmae=*/ null, + /*timeout=*/ null, + nonEmptyMetadata, + IS_CLIENT, + CALL_ID, + socketAddress); verify(sink).write( metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder() .setSequenceIdWithinCall(1) @@ -719,6 +793,107 @@ public final class BinlogHelperTest { getPeerSocket(Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build())); } + @Test + @SuppressWarnings({"unchecked"}) + public void clientDeadlineLogged_deadlineSetViaCallOption() { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + + ClientCall call = + new BinlogHelper(mockSinkWriter) + .getClientInterceptor(CALL_ID) + .interceptCall( + method, + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall(); + } + + @Override + public String authority() { + return null; + } + }); + call.start(mockListener, new Metadata()); + ArgumentCaptor callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockSinkWriter).logSendInitialMetadata( + any(Integer.class), + any(String.class), + callOptTimeoutCaptor.capture(), + any(Metadata.class), + any(Boolean.class), + any(CallId.class)); + Duration timeout = callOptTimeoutCaptor.getValue(); + assertThat(TimeUnit.SECONDS.toNanos(1) - timeout.getNanos()) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + + @Test + @SuppressWarnings({"unchecked"}) + public void clientDeadlineLogged_deadlineSetViaContext() throws Exception { + // important: deadline is read from the ctx where call was created + final SettableFuture> callFuture = SettableFuture.create(); + Context.current() + .withDeadline( + Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor()) + .run(new Runnable() { + @Override + public void run() { + MethodDescriptor method = + MethodDescriptor.newBuilder() + .setType(MethodType.UNKNOWN) + .setFullMethodName("service/method") + .setRequestMarshaller(BYTEARRAY_MARSHALLER) + .setResponseMarshaller(BYTEARRAY_MARSHALLER) + .build(); + + callFuture.set(new BinlogHelper(mockSinkWriter) + .getClientInterceptor(CALL_ID) + .interceptCall( + method, + CallOptions.DEFAULT.withOption( + BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, CALL_ID) + .withDeadlineAfter(1, TimeUnit.SECONDS), + new Channel() { + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, + CallOptions callOptions) { + return new NoopClientCall(); + } + + @Override + public String authority() { + return null; + } + })); + } + }); + ClientCall.Listener mockListener = mock(ClientCall.Listener.class); + callFuture.get().start(mockListener, new Metadata()); + ArgumentCaptor callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class); + verify(mockSinkWriter).logSendInitialMetadata( + any(Integer.class), + any(String.class), + callOptTimeoutCaptor.capture(), + any(Metadata.class), + any(Boolean.class), + any(CallId.class)); + Duration timeout = callOptTimeoutCaptor.getValue(); + assertThat(TimeUnit.SECONDS.toNanos(1) - timeout.getNanos()) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); + } + @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void clientInterceptor() throws Exception { @@ -771,20 +946,25 @@ public final class BinlogHelperTest { .getClientInterceptor(CALL_ID) .interceptCall( method, - CallOptions.DEFAULT.withOption( - BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, CALL_ID), + CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS), channel); // send initial metadata { Metadata clientInitial = new Metadata(); interceptedCall.start(mockListener, clientInitial); + ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Duration.class); verify(mockSinkWriter).logSendInitialMetadata( /*seq=*/ eq(1), + eq("service/method"), + timeoutCaptor.capture(), same(clientInitial), eq(IS_CLIENT), same(CALL_ID)); verifyNoMoreInteractions(mockSinkWriter); + Duration timeout = timeoutCaptor.getValue(); + assertThat(TimeUnit.SECONDS.toNanos(1) - timeout.getNanos()) + .isAtMost(TimeUnit.MILLISECONDS.toNanos(250)); assertSame(clientInitial, actualClientInitial.get()); } @@ -794,6 +974,8 @@ public final class BinlogHelperTest { interceptedListener.get().onHeaders(serverInitial); verify(mockSinkWriter).logRecvInitialMetadata( /*seq=*/ eq(2), + isNull(String.class), + isNull(Duration.class), same(serverInitial), eq(IS_CLIENT), same(CALL_ID), @@ -864,6 +1046,7 @@ public final class BinlogHelperTest { // begin call and receive initial metadata { Metadata clientInitial = new Metadata(); + clientInitial.put(GrpcUtil.TIMEOUT_KEY, TimeUnit.MILLISECONDS.toNanos(1234)); final MethodDescriptor method = MethodDescriptor.newBuilder() .setType(MethodType.UNKNOWN) @@ -917,6 +1100,8 @@ public final class BinlogHelperTest { }); verify(mockSinkWriter).logRecvInitialMetadata( /*seq=*/ eq(1), + eq("service/method"), + eq(Durations.fromMillis(1234)), same(clientInitial), eq(IS_SERVER), same(CALL_ID), @@ -930,6 +1115,8 @@ public final class BinlogHelperTest { interceptedCall.get().sendHeaders(serverInital); verify(mockSinkWriter).logSendInitialMetadata( /*seq=*/ eq(2), + isNull(String.class), + isNull(Duration.class), same(serverInital), eq(IS_SERVER), same(CALL_ID));