services: binlog deadline and methodName at start of rpc (#4550)

These are first class concepts and should be pulled to top level of
proto.
This commit is contained in:
zpencer 2018-06-07 19:16:34 -07:00 committed by GitHub
parent 11e2d6d7f1
commit 2a971e3b05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 283 additions and 14 deletions

View File

@ -21,8 +21,11 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.services.BinaryLogProvider.BYTEARRAY_MARSHALLER;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import com.google.re2j.Matcher;
import com.google.re2j.Pattern;
import io.grpc.Attributes;
@ -31,6 +34,8 @@ import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
@ -52,6 +57,7 @@ import io.grpc.binarylog.Metadata.Builder;
import io.grpc.binarylog.Peer;
import io.grpc.binarylog.Peer.PeerType;
import io.grpc.binarylog.Uint128;
import io.grpc.internal.GrpcUtil;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
@ -62,6 +68,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -103,19 +110,41 @@ final class BinlogHelper {
}
@Override
void logSendInitialMetadata(int seq, Metadata metadata, boolean isServer, CallId callId) {
void logSendInitialMetadata(
int seq,
@Nullable String methodName, // null on server
@Nullable Duration timeout, // null on server
Metadata metadata,
boolean isServer,
CallId callId) {
Preconditions.checkArgument(methodName == null || !isServer);
Preconditions.checkArgument(timeout == null || !isServer);
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
.setSequenceIdWithinCall(seq)
.setType(Type.SEND_INITIAL_METADATA)
.setLogger(isServer ? GrpcLogEntry.Logger.SERVER : GrpcLogEntry.Logger.CLIENT)
.setCallId(callIdToProto(callId));
addMetadataToProto(entryBuilder, metadata, maxHeaderBytes);
if (methodName != null) {
entryBuilder.setMethodName(methodName);
}
if (timeout != null) {
entryBuilder.setTimeout(timeout);
}
sink.write(entryBuilder.build());
}
@Override
void logRecvInitialMetadata(
int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket) {
int seq,
@Nullable String methodName, // null on client
@Nullable Duration timeout, // null on client
Metadata metadata,
boolean isServer,
CallId callId,
SocketAddress peerSocket) {
Preconditions.checkArgument(methodName == null || isServer);
Preconditions.checkArgument(timeout == null || isServer);
GrpcLogEntry.Builder entryBuilder = GrpcLogEntry.newBuilder()
.setSequenceIdWithinCall(seq)
.setType(Type.RECV_INITIAL_METADATA)
@ -123,6 +152,12 @@ final class BinlogHelper {
.setCallId(callIdToProto(callId))
.setPeer(socketToProto(peerSocket));
addMetadataToProto(entryBuilder, metadata, maxHeaderBytes);
if (methodName != null) {
entryBuilder.setMethodName(methodName);
}
if (timeout != null) {
entryBuilder.setTimeout(timeout);
}
sink.write(entryBuilder.build());
}
@ -195,14 +230,25 @@ final class BinlogHelper {
* as determined by the binary logging configuration.
*/
abstract void logSendInitialMetadata(
int seq, Metadata metadata, boolean isServer, CallId callId);
int seq,
String methodName,
Duration timeout,
Metadata metadata,
boolean isServer,
CallId callId);
/**
* Logs the receiving of initial metadata. This method logs the appropriate number of bytes
* as determined by the binary logging configuration.
*/
abstract void logRecvInitialMetadata(
int seq, Metadata metadata, boolean isServer, CallId callId, SocketAddress peerSocket);
int seq,
String methodName,
Duration timeout,
Metadata metadata,
boolean isServer,
CallId callId,
SocketAddress peerSocket);
/**
* Logs the trailing metadata. This method logs the appropriate number of bytes
@ -249,16 +295,34 @@ final class BinlogHelper {
return peer;
}
private static Deadline min(@Nullable Deadline deadline0, @Nullable Deadline deadline1) {
if (deadline0 == null) {
return deadline1;
}
if (deadline1 == null) {
return deadline0;
}
return deadline0.minimum(deadline1);
}
public ClientInterceptor getClientInterceptor(final CallId callId) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
final AtomicInteger seq = new AtomicInteger(1);
final String methodName = method.getFullMethodName();
// The timeout should reflect the time remaining when the call is started, so do not
// compute remaining time here.
final Deadline deadline = min(callOptions.getDeadline(), Context.current().getDeadline());
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
writer.logSendInitialMetadata(seq.getAndIncrement(), headers, CLIENT, callId);
final Duration timeout = deadline == null ? null
: Durations.fromNanos(deadline.timeRemaining(TimeUnit.NANOSECONDS));
writer.logSendInitialMetadata(
seq.getAndIncrement(), methodName, timeout, headers, CLIENT, callId);
ClientCall.Listener<RespT> wListener =
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
@ -277,7 +341,13 @@ final class BinlogHelper {
public void onHeaders(Metadata headers) {
SocketAddress peer = getPeerSocket(getAttributes());
writer.logRecvInitialMetadata(
seq.getAndIncrement(), headers, CLIENT, callId, peer);
seq.getAndIncrement(),
/*methodName=*/ null,
/*timeout=*/ null,
headers,
CLIENT,
callId,
peer);
super.onHeaders(headers);
}
@ -315,7 +385,13 @@ final class BinlogHelper {
ServerCallHandler<ReqT, RespT> next) {
final AtomicInteger seq = new AtomicInteger(1);
SocketAddress peer = getPeerSocket(call.getAttributes());
writer.logRecvInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId, peer);
String methodName = call.getMethodDescriptor().getFullMethodName();
Long timeoutNanos = headers.get(GrpcUtil.TIMEOUT_KEY);
final Duration timeout =
timeoutNanos == null ? null : Durations.fromNanos(timeoutNanos);
writer.logRecvInitialMetadata(
seq.getAndIncrement(), methodName, timeout, headers, SERVER, callId, peer);
ServerCall<ReqT, RespT> wCall = new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
@ -331,7 +407,13 @@ final class BinlogHelper {
@Override
public void sendHeaders(Metadata headers) {
writer.logSendInitialMetadata(seq.getAndIncrement(), headers, SERVER, callId);
writer.logSendInitialMetadata(
seq.getAndIncrement(),
/*methodName=*/ null,
/*timeout=*/ null,
headers,
SERVER,
callId);
super.sendHeaders(headers);
}

View File

@ -16,6 +16,7 @@
package io.grpc.services;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.services.BinaryLogProvider.BYTEARRAY_MARSHALLER;
import static io.grpc.services.BinlogHelper.DUMMY_SOCKET;
import static io.grpc.services.BinlogHelper.getPeerSocket;
@ -23,18 +24,25 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
import io.grpc.BinaryLog.CallId;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Deadline;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -48,6 +56,7 @@ import io.grpc.binarylog.MetadataEntry;
import io.grpc.binarylog.Peer;
import io.grpc.binarylog.Peer.PeerType;
import io.grpc.binarylog.Uint128;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.NoopClientCall;
import io.grpc.internal.NoopServerCall;
import io.grpc.services.BinlogHelper.FactoryImpl;
@ -58,11 +67,14 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.Charset;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
/** Tests for {@link BinlogHelper}. */
@RunWith(JUnit4.class)
@ -535,7 +547,13 @@ public final class BinlogHelperTest {
@Test
public void logSendInitialMetadata_server() throws Exception {
sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID);
sinkWriterImpl.logSendInitialMetadata(
/*seq=*/ 1,
/*methodNmae=*/ null,
/*timeout=*/ null,
nonEmptyMetadata,
IS_SERVER,
CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
@ -547,10 +565,34 @@ public final class BinlogHelperTest {
@Test
public void logSendInitialMetadata_client() throws Exception {
sinkWriterImpl.logSendInitialMetadata(/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID);
sinkWriterImpl.logSendInitialMetadata(
/*seq=*/ 1,
"service/method",
Durations.fromMillis(1234),
nonEmptyMetadata,
IS_CLIENT,
CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setMethodName("service/method")
.setTimeout(Durations.fromMillis(1234))
.setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.build());
sinkWriterImpl.logSendInitialMetadata(
/*seq=*/ 1,
"service/method",
/*timeout=*/ null,
nonEmptyMetadata,
IS_CLIENT,
CALL_ID);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setMethodName("service/method")
.setType(GrpcLogEntry.Type.SEND_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.CLIENT)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -563,10 +605,36 @@ public final class BinlogHelperTest {
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
sinkWriterImpl.logRecvInitialMetadata(
/*seq=*/ 1, nonEmptyMetadata, IS_SERVER, CALL_ID, socketAddress);
/*seq=*/ 1,
"service/method",
Durations.fromMillis(1234),
nonEmptyMetadata,
IS_SERVER,
CALL_ID,
socketAddress);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setMethodName("service/method")
.setTimeout(Durations.fromMillis(1234))
.setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
.setPeer(BinlogHelper.socketToProto(socketAddress))
.build());
sinkWriterImpl.logRecvInitialMetadata(
/*seq=*/ 1,
"service/method",
/*timeout=*/ null,
nonEmptyMetadata,
IS_SERVER,
CALL_ID,
socketAddress);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
.setMethodName("service/method")
.setType(GrpcLogEntry.Type.RECV_INITIAL_METADATA)
.setLogger(GrpcLogEntry.Logger.SERVER)
.setCallId(BinlogHelper.callIdToProto(CALL_ID))
@ -580,7 +648,13 @@ public final class BinlogHelperTest {
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
sinkWriterImpl.logRecvInitialMetadata(
/*seq=*/ 1, nonEmptyMetadata, IS_CLIENT, CALL_ID, socketAddress);
/*seq=*/ 1,
/*methodNmae=*/ null,
/*timeout=*/ null,
nonEmptyMetadata,
IS_CLIENT,
CALL_ID,
socketAddress);
verify(sink).write(
metadataToProtoTestHelper(nonEmptyMetadata, 10).toBuilder()
.setSequenceIdWithinCall(1)
@ -719,6 +793,107 @@ public final class BinlogHelperTest {
getPeerSocket(Attributes.newBuilder().set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, peer).build()));
}
@Test
@SuppressWarnings({"unchecked"})
public void clientDeadlineLogged_deadlineSetViaCallOption() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
ClientCall<byte[], byte[]> call =
new BinlogHelper(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>();
}
@Override
public String authority() {
return null;
}
});
call.start(mockListener, new Metadata());
ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockSinkWriter).logSendInitialMetadata(
any(Integer.class),
any(String.class),
callOptTimeoutCaptor.capture(),
any(Metadata.class),
any(Boolean.class),
any(CallId.class));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - timeout.getNanos())
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
@SuppressWarnings({"unchecked"})
public void clientDeadlineLogged_deadlineSetViaContext() throws Exception {
// important: deadline is read from the ctx where call was created
final SettableFuture<ClientCall<byte[], byte[]>> callFuture = SettableFuture.create();
Context.current()
.withDeadline(
Deadline.after(1, TimeUnit.SECONDS), Executors.newSingleThreadScheduledExecutor())
.run(new Runnable() {
@Override
public void run() {
MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
.setFullMethodName("service/method")
.setRequestMarshaller(BYTEARRAY_MARSHALLER)
.setResponseMarshaller(BYTEARRAY_MARSHALLER)
.build();
callFuture.set(new BinlogHelper(mockSinkWriter)
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, CALL_ID)
.withDeadlineAfter(1, TimeUnit.SECONDS),
new Channel() {
@Override
public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor,
CallOptions callOptions) {
return new NoopClientCall<RequestT, ResponseT>();
}
@Override
public String authority() {
return null;
}
}));
}
});
ClientCall.Listener<byte[]> mockListener = mock(ClientCall.Listener.class);
callFuture.get().start(mockListener, new Metadata());
ArgumentCaptor<Duration> callOptTimeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockSinkWriter).logSendInitialMetadata(
any(Integer.class),
any(String.class),
callOptTimeoutCaptor.capture(),
any(Metadata.class),
any(Boolean.class),
any(CallId.class));
Duration timeout = callOptTimeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - timeout.getNanos())
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
}
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void clientInterceptor() throws Exception {
@ -771,20 +946,25 @@ public final class BinlogHelperTest {
.getClientInterceptor(CALL_ID)
.interceptCall(
method,
CallOptions.DEFAULT.withOption(
BinaryLogProvider.CLIENT_CALL_ID_CALLOPTION_KEY, CALL_ID),
CallOptions.DEFAULT.withDeadlineAfter(1, TimeUnit.SECONDS),
channel);
// send initial metadata
{
Metadata clientInitial = new Metadata();
interceptedCall.start(mockListener, clientInitial);
ArgumentCaptor<Duration> timeoutCaptor = ArgumentCaptor.forClass(Duration.class);
verify(mockSinkWriter).logSendInitialMetadata(
/*seq=*/ eq(1),
eq("service/method"),
timeoutCaptor.capture(),
same(clientInitial),
eq(IS_CLIENT),
same(CALL_ID));
verifyNoMoreInteractions(mockSinkWriter);
Duration timeout = timeoutCaptor.getValue();
assertThat(TimeUnit.SECONDS.toNanos(1) - timeout.getNanos())
.isAtMost(TimeUnit.MILLISECONDS.toNanos(250));
assertSame(clientInitial, actualClientInitial.get());
}
@ -794,6 +974,8 @@ public final class BinlogHelperTest {
interceptedListener.get().onHeaders(serverInitial);
verify(mockSinkWriter).logRecvInitialMetadata(
/*seq=*/ eq(2),
isNull(String.class),
isNull(Duration.class),
same(serverInitial),
eq(IS_CLIENT),
same(CALL_ID),
@ -864,6 +1046,7 @@ public final class BinlogHelperTest {
// begin call and receive initial metadata
{
Metadata clientInitial = new Metadata();
clientInitial.put(GrpcUtil.TIMEOUT_KEY, TimeUnit.MILLISECONDS.toNanos(1234));
final MethodDescriptor<byte[], byte[]> method =
MethodDescriptor.<byte[], byte[]>newBuilder()
.setType(MethodType.UNKNOWN)
@ -917,6 +1100,8 @@ public final class BinlogHelperTest {
});
verify(mockSinkWriter).logRecvInitialMetadata(
/*seq=*/ eq(1),
eq("service/method"),
eq(Durations.fromMillis(1234)),
same(clientInitial),
eq(IS_SERVER),
same(CALL_ID),
@ -930,6 +1115,8 @@ public final class BinlogHelperTest {
interceptedCall.get().sendHeaders(serverInital);
verify(mockSinkWriter).logSendInitialMetadata(
/*seq=*/ eq(2),
isNull(String.class),
isNull(Duration.class),
same(serverInital),
eq(IS_SERVER),
same(CALL_ID));