diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index 7350de4f1f..448d691306 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -21,6 +21,7 @@ import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY; import static java.lang.Math.max; import com.google.common.base.MoreObjects; +import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; @@ -48,6 +49,7 @@ import io.grpc.internal.InsightBuilder; import io.grpc.internal.ManagedClientTransport; import io.grpc.internal.NoopClientStream; import io.grpc.internal.ObjectPool; +import io.grpc.internal.ServerListener; import io.grpc.internal.ServerStream; import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; @@ -79,6 +81,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans private final int clientMaxInboundMetadataSize; private final String authority; private final String userAgent; + private final Optional optionalServerListener; private int serverMaxInboundMetadataSize; private ObjectPool serverSchedulerPool; private ScheduledExecutorService serverScheduler; @@ -111,9 +114,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans } }; - public InProcessTransport( - String name, int maxInboundMetadataSize, String authority, String userAgent, - Attributes eagAttrs) { + private InProcessTransport(String name, int maxInboundMetadataSize, String authority, + String userAgent, Attributes eagAttrs, Optional optionalServerListener) { this.name = name; this.clientMaxInboundMetadataSize = maxInboundMetadataSize; this.authority = authority; @@ -125,21 +127,45 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name)) .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name)) .build(); + this.optionalServerListener = optionalServerListener; logId = InternalLogId.allocate(getClass(), name); } + public InProcessTransport( + String name, int maxInboundMetadataSize, String authority, String userAgent, + Attributes eagAttrs) { + this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, + Optional.absent()); + } + + InProcessTransport( + String name, int maxInboundMetadataSize, String authority, String userAgent, + Attributes eagAttrs, ObjectPool serverSchedulerPool, + List serverStreamTracerFactories, + ServerListener serverListener) { + this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener)); + this.serverMaxInboundMetadataSize = maxInboundMetadataSize; + this.serverSchedulerPool = serverSchedulerPool; + this.serverStreamTracerFactories = serverStreamTracerFactories; + } + @CheckReturnValue @Override public synchronized Runnable start(ManagedClientTransport.Listener listener) { this.clientTransportListener = listener; - InProcessServer server = InProcessServer.findServer(name); - if (server != null) { - serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize(); - serverSchedulerPool = server.getScheduledExecutorServicePool(); + if (optionalServerListener.isPresent()) { serverScheduler = serverSchedulerPool.getObject(); - serverStreamTracerFactories = server.getStreamTracerFactories(); - // Must be semi-initialized; past this point, can begin receiving requests - serverTransportListener = server.register(this); + serverTransportListener = optionalServerListener.get().transportCreated(this); + } else { + InProcessServer server = InProcessServer.findServer(name); + if (server != null) { + serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize(); + serverSchedulerPool = server.getScheduledExecutorServicePool(); + serverScheduler = serverSchedulerPool.getObject(); + serverStreamTracerFactories = server.getStreamTracerFactories(); + // Must be semi-initialized; past this point, can begin receiving requests + serverTransportListener = server.register(this); + } } if (serverTransportListener == null) { shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name); diff --git a/core/src/main/java/io/grpc/inprocess/InternalInProcess.java b/core/src/main/java/io/grpc/inprocess/InternalInProcess.java new file mode 100644 index 0000000000..021b07a80b --- /dev/null +++ b/core/src/main/java/io/grpc/inprocess/InternalInProcess.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.inprocess; + +import io.grpc.Attributes; +import io.grpc.Internal; +import io.grpc.ServerStreamTracer; +import io.grpc.internal.ConnectionClientTransport; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.ServerListener; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +/** + * Internal {@link InProcessTransport} accessor. + * + *

This is intended for use by io.grpc.internal, and the specifically + * supported transport packages. + */ +@Internal +public final class InternalInProcess { + + private InternalInProcess() {} + + /** + * Creates a new InProcessTransport. + * + *

When started, the transport will be registered with the given + * {@link ServerListener}. + */ + @Internal + public static ConnectionClientTransport createInProcessTransport( + String name, + int maxInboundMetadataSize, + String authority, + String userAgent, + Attributes eagAttrs, + ObjectPool serverSchedulerPool, + List serverStreamTracerFactories, + ServerListener serverListener) { + return new InProcessTransport( + name, + maxInboundMetadataSize, + authority, + userAgent, + eagAttrs, + serverSchedulerPool, + serverStreamTracerFactories, + serverListener); + } +} diff --git a/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java new file mode 100644 index 0000000000..6d8f3a1ca4 --- /dev/null +++ b/core/src/test/java/io/grpc/inprocess/StandaloneInProcessTransportTest.java @@ -0,0 +1,159 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.inprocess; + +import com.google.common.collect.ImmutableList; +import io.grpc.InternalChannelz.SocketStats; +import io.grpc.InternalInstrumented; +import io.grpc.ServerStreamTracer; +import io.grpc.internal.AbstractTransportTest; +import io.grpc.internal.GrpcUtil; +import io.grpc.internal.InternalServer; +import io.grpc.internal.ManagedClientTransport; +import io.grpc.internal.ObjectPool; +import io.grpc.internal.ServerListener; +import io.grpc.internal.ServerTransport; +import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourcePool; +import java.io.IOException; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nullable; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link InProcessTransport} when used with a separate {@link InternalServer}. */ +@RunWith(JUnit4.class) +public final class StandaloneInProcessTransportTest extends AbstractTransportTest { + private static final String TRANSPORT_NAME = "perfect-for-testing"; + private static final String AUTHORITY = "a-testing-authority"; + private static final String USER_AGENT = "a-testing-user-agent"; + + private final ObjectPool schedulerPool = + SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE); + + private TestServer currentServer; + + @Override + protected List newServer( + List streamTracerFactories) { + return ImmutableList.of(new TestServer(streamTracerFactories)); + } + + @Override + protected List newServer( + int port, List streamTracerFactories) { + return newServer(streamTracerFactories); + } + + @Override + protected String testAuthority(InternalServer server) { + return AUTHORITY; + } + + @Override + protected ManagedClientTransport newClientTransport(InternalServer server) { + TestServer testServer = (TestServer) server; + return InternalInProcess.createInProcessTransport( + TRANSPORT_NAME, + GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, + testAuthority(server), + USER_AGENT, + eagAttrs(), + schedulerPool, + testServer.streamTracerFactories, + testServer.serverListener); + } + + @Override + protected boolean sizesReported() { + // TODO(zhangkun83): InProcessTransport doesn't record metrics for now + // (https://github.com/grpc/grpc-java/issues/2284) + return false; + } + + @Test + @Ignore + @Override + public void socketStats() throws Exception { + // test does not apply to in-process + } + + /** An internalserver just for this test. */ + private final class TestServer implements InternalServer { + + final List streamTracerFactories; + ServerListener serverListener; + + TestServer(List streamTracerFactories) { + this.streamTracerFactories = streamTracerFactories; + } + + @Override + public void start(ServerListener serverListener) throws IOException { + if (currentServer != null) { + throw new IOException("Server already present"); + } + currentServer = this; + this.serverListener = new ServerListenerWrapper(serverListener); + } + + @Override + public void shutdown() { + currentServer = null; + serverListener.serverShutdown(); + } + + @Override + public SocketAddress getListenSocketAddress() { + return new SocketAddress() {}; + } + + @Override + @Nullable + public InternalInstrumented getListenSocketStats() { + return null; + } + } + + /** Wraps the server listener to ensure we don't accept new transports after shutdown. */ + private static final class ServerListenerWrapper implements ServerListener { + private final ServerListener delegateListener; + private boolean shutdown; + + ServerListenerWrapper(ServerListener delegateListener) { + this.delegateListener = delegateListener; + } + + @Override + public ServerTransportListener transportCreated(ServerTransport transport) { + if (shutdown) { + return null; + } + return delegateListener.transportCreated(transport); + } + + @Override + public void serverShutdown() { + shutdown = true; + delegateListener.serverShutdown(); + } + } +}