Enable use of InProcessTransport outside of InProcessServer

This allows an InProcessTransport instance to be created via a new
internal accessor class InternalInProcess. We effectively just expose a
method to create an InProcessTransport with a existing ServerListener
instance.

This will be used for in-process channels to an under-development
on-device server.
This commit is contained in:
markb74 2020-03-03 00:02:17 +00:00 committed by GitHub
parent afc1f2e567
commit fd5f4aac63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 260 additions and 10 deletions

View File

@ -21,6 +21,7 @@ import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
import static java.lang.Math.max;
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
@ -48,6 +49,7 @@ import io.grpc.internal.InsightBuilder;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.NoopClientStream;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerStream;
import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport;
@ -79,6 +81,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private final int clientMaxInboundMetadataSize;
private final String authority;
private final String userAgent;
private final Optional<ServerListener> optionalServerListener;
private int serverMaxInboundMetadataSize;
private ObjectPool<ScheduledExecutorService> serverSchedulerPool;
private ScheduledExecutorService serverScheduler;
@ -111,9 +114,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
}
};
public InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs) {
private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
String userAgent, Attributes eagAttrs, Optional<ServerListener> optionalServerListener) {
this.name = name;
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
this.authority = authority;
@ -125,21 +127,45 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
.set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, new InProcessSocketAddress(name))
.set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, new InProcessSocketAddress(name))
.build();
this.optionalServerListener = optionalServerListener;
logId = InternalLogId.allocate(getClass(), name);
}
public InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
Optional.<ServerListener>absent());
}
InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
Attributes eagAttrs, ObjectPool<ScheduledExecutorService> serverSchedulerPool,
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
ServerListener serverListener) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener));
this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
this.serverSchedulerPool = serverSchedulerPool;
this.serverStreamTracerFactories = serverStreamTracerFactories;
}
@CheckReturnValue
@Override
public synchronized Runnable start(ManagedClientTransport.Listener listener) {
this.clientTransportListener = listener;
InProcessServer server = InProcessServer.findServer(name);
if (server != null) {
serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
serverSchedulerPool = server.getScheduledExecutorServicePool();
if (optionalServerListener.isPresent()) {
serverScheduler = serverSchedulerPool.getObject();
serverStreamTracerFactories = server.getStreamTracerFactories();
// Must be semi-initialized; past this point, can begin receiving requests
serverTransportListener = server.register(this);
serverTransportListener = optionalServerListener.get().transportCreated(this);
} else {
InProcessServer server = InProcessServer.findServer(name);
if (server != null) {
serverMaxInboundMetadataSize = server.getMaxInboundMetadataSize();
serverSchedulerPool = server.getScheduledExecutorServicePool();
serverScheduler = serverSchedulerPool.getObject();
serverStreamTracerFactories = server.getStreamTracerFactories();
// Must be semi-initialized; past this point, can begin receiving requests
serverTransportListener = server.register(this);
}
}
if (serverTransportListener == null) {
shutdownStatus = Status.UNAVAILABLE.withDescription("Could not find server: " + name);

View File

@ -0,0 +1,65 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.inprocess;
import io.grpc.Attributes;
import io.grpc.Internal;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
/**
* Internal {@link InProcessTransport} accessor.
*
* <p>This is intended for use by io.grpc.internal, and the specifically
* supported transport packages.
*/
@Internal
public final class InternalInProcess {
private InternalInProcess() {}
/**
* Creates a new InProcessTransport.
*
* <p>When started, the transport will be registered with the given
* {@link ServerListener}.
*/
@Internal
public static ConnectionClientTransport createInProcessTransport(
String name,
int maxInboundMetadataSize,
String authority,
String userAgent,
Attributes eagAttrs,
ObjectPool<ScheduledExecutorService> serverSchedulerPool,
List<ServerStreamTracer.Factory> serverStreamTracerFactories,
ServerListener serverListener) {
return new InProcessTransport(
name,
maxInboundMetadataSize,
authority,
userAgent,
eagAttrs,
serverSchedulerPool,
serverStreamTracerFactories,
serverListener);
}
}

View File

@ -0,0 +1,159 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.inprocess;
import com.google.common.collect.ImmutableList;
import io.grpc.InternalChannelz.SocketStats;
import io.grpc.InternalInstrumented;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourcePool;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link InProcessTransport} when used with a separate {@link InternalServer}. */
@RunWith(JUnit4.class)
public final class StandaloneInProcessTransportTest extends AbstractTransportTest {
private static final String TRANSPORT_NAME = "perfect-for-testing";
private static final String AUTHORITY = "a-testing-authority";
private static final String USER_AGENT = "a-testing-user-agent";
private final ObjectPool<ScheduledExecutorService> schedulerPool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
private TestServer currentServer;
@Override
protected List<? extends InternalServer> newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
return ImmutableList.of(new TestServer(streamTracerFactories));
}
@Override
protected List<? extends InternalServer> newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
return newServer(streamTracerFactories);
}
@Override
protected String testAuthority(InternalServer server) {
return AUTHORITY;
}
@Override
protected ManagedClientTransport newClientTransport(InternalServer server) {
TestServer testServer = (TestServer) server;
return InternalInProcess.createInProcessTransport(
TRANSPORT_NAME,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
testAuthority(server),
USER_AGENT,
eagAttrs(),
schedulerPool,
testServer.streamTracerFactories,
testServer.serverListener);
}
@Override
protected boolean sizesReported() {
// TODO(zhangkun83): InProcessTransport doesn't record metrics for now
// (https://github.com/grpc/grpc-java/issues/2284)
return false;
}
@Test
@Ignore
@Override
public void socketStats() throws Exception {
// test does not apply to in-process
}
/** An internalserver just for this test. */
private final class TestServer implements InternalServer {
final List<ServerStreamTracer.Factory> streamTracerFactories;
ServerListener serverListener;
TestServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
this.streamTracerFactories = streamTracerFactories;
}
@Override
public void start(ServerListener serverListener) throws IOException {
if (currentServer != null) {
throw new IOException("Server already present");
}
currentServer = this;
this.serverListener = new ServerListenerWrapper(serverListener);
}
@Override
public void shutdown() {
currentServer = null;
serverListener.serverShutdown();
}
@Override
public SocketAddress getListenSocketAddress() {
return new SocketAddress() {};
}
@Override
@Nullable
public InternalInstrumented<SocketStats> getListenSocketStats() {
return null;
}
}
/** Wraps the server listener to ensure we don't accept new transports after shutdown. */
private static final class ServerListenerWrapper implements ServerListener {
private final ServerListener delegateListener;
private boolean shutdown;
ServerListenerWrapper(ServerListener delegateListener) {
this.delegateListener = delegateListener;
}
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
if (shutdown) {
return null;
}
return delegateListener.transportCreated(transport);
}
@Override
public void serverShutdown() {
shutdown = true;
delegateListener.serverShutdown();
}
}
}