netty,okhttp,testing: always set TRANSPORT_ATTR_REMOTE_ADDR (#4217)

Always set the remote address, no reason why this should be a TLS-only
feature. This is needed for channelz, and is especially useful in unit
tests where we are using plaintext.

This PR adds the attr for plaintext.
This commit is contained in:
zpencer 2018-03-23 15:44:40 -07:00 committed by GitHub
parent ae42d666ad
commit a5b55bb24c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 80 additions and 46 deletions

View File

@ -329,7 +329,7 @@ public final class ProtocolNegotiators {
HttpClientCodec httpClientCodec = new HttpClientCodec();
final HttpClientUpgradeHandler upgrader =
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
return new BufferingHttp2UpgradeHandler(upgrader);
return new BufferingHttp2UpgradeHandler(upgrader, handler);
}
}
@ -662,8 +662,11 @@ public final class ProtocolNegotiators {
private static class BufferUntilChannelActiveHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {
BufferUntilChannelActiveHandler(ChannelHandler... handlers) {
super(handlers);
private final GrpcHttp2ConnectionHandler handler;
BufferUntilChannelActiveHandler(GrpcHttp2ConnectionHandler handler) {
super(handler);
this.handler = handler;
}
@Override
@ -679,6 +682,11 @@ public final class ProtocolNegotiators {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
writeBufferedAndRemove(ctx);
handler.handleProtocolNegotiationCompleted(
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.build());
super.channelActive(ctx);
}
}
@ -689,8 +697,11 @@ public final class ProtocolNegotiators {
private static class BufferingHttp2UpgradeHandler extends AbstractBufferingHandler
implements ProtocolNegotiator.Handler {
BufferingHttp2UpgradeHandler(ChannelHandler... handlers) {
super(handlers);
private final GrpcHttp2ConnectionHandler grpcHandler;
BufferingHttp2UpgradeHandler(ChannelHandler handler, GrpcHttp2ConnectionHandler grpcHandler) {
super(handler);
this.grpcHandler = grpcHandler;
}
@Override
@ -712,6 +723,11 @@ public final class ProtocolNegotiators {
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
writeBufferedAndRemove(ctx);
grpcHandler.handleProtocolNegotiationCompleted(
Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, ctx.channel().remoteAddress())
.build());
} else if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_REJECTED) {
fail(ctx, unavailableException("HTTP/2 upgrade rejected"));
}

View File

@ -58,6 +58,7 @@ class OkHttpClientStream extends AbstractClientStream {
private volatile int id = ABSENT_ID;
private final TransportState state;
private final Sink sink = new Sink();
private final Attributes attributes;
private boolean useGet = false;
@ -83,6 +84,10 @@ class OkHttpClientStream extends AbstractClientStream {
this.method = method;
this.authority = authority;
this.userAgent = userAgent;
// OkHttpClientStream is only created after the transport has finished connecting,
// so it is safe to read the transport attributes.
// We make a copy here for convenience, even though we can ask the transport.
this.attributes = transport.getAttributes();
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock, frameWriter, outboundFlow,
transport);
}
@ -123,7 +128,7 @@ class OkHttpClientStream extends AbstractClientStream {
@Override
public Attributes getAttributes() {
return Attributes.EMPTY;
return attributes;
}
class Sink implements AbstractClientStream.Sink {

View File

@ -31,6 +31,7 @@ import com.squareup.okhttp.Request;
import com.squareup.okhttp.internal.http.StatusLine;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Grpc;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
@ -149,6 +150,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
private final int maxMessageSize;
private int connectionUnacknowledgedBytesRead;
private ClientFrameHandler clientFrameHandler;
// Caution: Not synchronized, new value can only be safely read after the connection is complete.
private Attributes attributes = Attributes.EMPTY;
/**
* Indicates the transport is in go-away state: no new streams will be processed, but existing
* streams may continue.
@ -467,6 +470,12 @@ class OkHttpClientTransport implements ConnectionClientTransport {
sock.setTcpNoDelay(true);
source = Okio.buffer(Okio.source(sock));
sink = Okio.buffer(Okio.sink(sock));
// TODO(zhangkun83): fill channel security attributes
// The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
attributes = Attributes
.newBuilder()
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
.build();
} catch (StatusException e) {
startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
return;
@ -482,6 +491,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
synchronized (lock) {
socket = sock;
maxConcurrentStreams = Integer.MAX_VALUE;
startPendingStreams();
}
@ -675,8 +685,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
@Override
public Attributes getAttributes() {
// TODO(zhangkun83): fill channel security attributes
return Attributes.EMPTY;
return attributes;
}
/**

View File

@ -68,8 +68,6 @@ import io.grpc.internal.ServerTransportListener;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
@ -231,7 +229,7 @@ public abstract class AbstractTransportTest {
public void frameAfterRstStreamShouldNotBreakClientChannel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -298,7 +296,7 @@ public abstract class AbstractTransportTest {
server.start(serverListener);
client = newClientTransport(server);
InOrder inOrder = inOrder(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportTerminated();
@ -312,7 +310,7 @@ public abstract class AbstractTransportTest {
server.start(serverListener);
client = newClientTransport(server);
InOrder inOrder = inOrder(mockClientTransportListener);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
@ -340,7 +338,7 @@ public abstract class AbstractTransportTest {
public void openStreamPreventsTermination() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -392,7 +390,7 @@ public abstract class AbstractTransportTest {
public void shutdownNowKillsClientStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -429,7 +427,7 @@ public abstract class AbstractTransportTest {
public void shutdownNowKillsServerStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -470,7 +468,7 @@ public abstract class AbstractTransportTest {
public void ping() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
ClientTransport.PingCallback mockPingCallback = mock(ClientTransport.PingCallback.class);
try {
client.ping(mockPingCallback, MoreExecutors.directExecutor());
@ -486,7 +484,7 @@ public abstract class AbstractTransportTest {
public void ping_duringShutdown() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
// Stream prevents termination
ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
@ -508,7 +506,7 @@ public abstract class AbstractTransportTest {
public void ping_afterTermination() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
@ -530,7 +528,7 @@ public abstract class AbstractTransportTest {
InOrder inOrder = inOrder(clientStreamTracerFactory);
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
// Stream prevents termination
ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(
@ -570,7 +568,7 @@ public abstract class AbstractTransportTest {
// dealing with afterTermination is harder than duringShutdown.
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
verify(mockClientTransportListener, timeout(TIMEOUT_MS)).transportReady();
Status shutdownReason = Status.UNAVAILABLE.withDescription("shutdown called");
client.shutdown(shutdownReason);
@ -594,7 +592,7 @@ public abstract class AbstractTransportTest {
public void transportInUse_normalClose() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase();
stream1.start(clientStreamListener1);
@ -624,7 +622,7 @@ public abstract class AbstractTransportTest {
public void transportInUse_clientCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
ClientStream stream1 = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener1 = new ClientStreamListenerBase();
stream1.start(clientStreamListener1);
@ -649,7 +647,7 @@ public abstract class AbstractTransportTest {
InOrder serverInOrder = inOrder(serverStreamTracerFactory);
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -804,7 +802,7 @@ public abstract class AbstractTransportTest {
public void authorityPropagation() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
@ -823,7 +821,7 @@ public abstract class AbstractTransportTest {
public void zeroMessageStream() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -859,7 +857,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_withServerHeaders() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -894,7 +892,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_noServerHeaders() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -938,7 +936,7 @@ public abstract class AbstractTransportTest {
public void earlyServerClose_serverFailure() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -1016,7 +1014,7 @@ public abstract class AbstractTransportTest {
public void clientCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -1047,7 +1045,7 @@ public abstract class AbstractTransportTest {
public void clientCancelFromWithinMessageRead() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -1134,7 +1132,7 @@ public abstract class AbstractTransportTest {
public void serverCancel() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -1182,7 +1180,7 @@ public abstract class AbstractTransportTest {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener =
serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -1343,7 +1341,7 @@ public abstract class AbstractTransportTest {
public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -1375,7 +1373,7 @@ public abstract class AbstractTransportTest {
public void interactionsAfterClientStreamCancelAreNoops() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
serverTransport = serverTransportListener.transport;
@ -1411,7 +1409,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_streamStarted() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
MockServerTransportListener serverTransportListener
= serverListener.takeListenerOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!haveTransportTracer()) {
@ -1496,7 +1494,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_streamEnded_ok() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1535,7 +1533,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_streamEnded_nonOk() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1575,7 +1573,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_client_streamEnded_nonOk() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1610,7 +1608,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_receive_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1655,7 +1653,7 @@ public abstract class AbstractTransportTest {
public void transportTracer_server_send_msg() throws Exception {
server.start(serverListener);
client = newClientTransport(server);
runIfNotNull(client.start(mockClientTransportListener));
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1700,7 +1698,7 @@ public abstract class AbstractTransportTest {
public void socketStats_addresses() throws Exception {
server.start(serverListener);
ManagedClientTransport client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
startTransport(client, mockClientTransportListener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1711,9 +1709,7 @@ public abstract class AbstractTransportTest {
= serverTransportListener.takeStreamOrFail(TIMEOUT_MS, TimeUnit.MILLISECONDS);
ServerStream serverStream = serverStreamCreation.stream;
// clients do not have TRANSPORT_ATTR_REMOTE_ADDR so use a hack for serverAddress
SocketAddress serverAddress
= new InetSocketAddress(InetAddress.getByName("127.0.0.1"), server.getPort());
SocketAddress serverAddress = clientStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketAddress clientAddress = serverStream.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
SocketStats clientSocketStats = client.getStats().get();
@ -1733,7 +1729,8 @@ public abstract class AbstractTransportTest {
*/
private void doPingPong(MockServerListener serverListener) throws Exception {
ManagedClientTransport client = newClientTransport(server);
runIfNotNull(client.start(mock(ManagedClientTransport.Listener.class)));
ManagedClientTransport.Listener listener = mock(ManagedClientTransport.Listener.class);
startTransport(client, listener);
ClientStream clientStream = client.newStream(methodDescriptor, new Metadata(), callOptions);
ClientStreamListenerBase clientStreamListener = new ClientStreamListenerBase();
clientStream.start(clientStreamListener);
@ -1798,6 +1795,13 @@ public abstract class AbstractTransportTest {
}
}
private static void startTransport(
ManagedClientTransport clientTransport,
ManagedClientTransport.Listener listener) {
runIfNotNull(clientTransport.start(listener));
verify(listener, timeout(100)).transportReady();
}
private static class MockServerListener implements ServerListener {
public final BlockingQueue<MockServerTransportListener> listeners
= new LinkedBlockingQueue<MockServerTransportListener>();