binder: Server and Channel Builders for BinderChannel. (#8218)

binder: Server and Channel Builders for BinderChannel.

Also adds 3 additional tests.
This commit is contained in:
markb74 2021-06-07 19:39:20 +02:00 committed by GitHub
parent a6d78c5e3e
commit 0c723f7ca9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1236 additions and 8 deletions

View File

@ -49,6 +49,7 @@ dependencies {
guavaDependency 'implementation'
implementation libraries.androidx_annotation
implementation libraries.androidx_core
testImplementation libraries.androidx_core
testImplementation libraries.androidx_test
testImplementation libraries.junit
@ -64,6 +65,7 @@ dependencies {
androidTestAnnotationProcessor libraries.autovalue
androidTestImplementation project(':grpc-testing')
androidTestImplementation project(':grpc-protobuf-lite')
androidTestImplementation libraries.autovalue_annotation
androidTestImplementation libraries.junit
androidTestImplementation libraries.androidx_core
@ -73,6 +75,9 @@ dependencies {
androidTestImplementation libraries.truth
androidTestImplementation libraries.mockito_android
androidTestImplementation libraries.androidx_lifecycle_service
androidTestImplementation (libraries.guava_testlib) {
exclude group: 'junit', module: 'junit'
}
}
import net.ltgt.gradle.errorprone.CheckSeverity

View File

@ -0,0 +1,176 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.binder;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import android.app.Application;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.os.IBinder;
import androidx.test.core.app.ApplicationProvider;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ServerCalls;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* Basic tests for Binder Channel, covering some of the edge cases not exercised by
* AbstractTransportTest.
*/
@RunWith(AndroidJUnit4.class)
public final class BinderChannelSmokeTest {
private final Context appContext = ApplicationProvider.getApplicationContext();
private static final int SLIGHTLY_MORE_THAN_ONE_BLOCK = 16 * 1024 + 100;
private static final String MSG = "Some text which will be repeated many many times";
final MethodDescriptor<String, String> method =
MethodDescriptor.newBuilder(StringMarshaller.INSTANCE, StringMarshaller.INSTANCE)
.setFullMethodName("test/method")
.setType(MethodDescriptor.MethodType.UNARY)
.build();
final MethodDescriptor<String, String> singleLargeResultMethod =
MethodDescriptor.newBuilder(StringMarshaller.INSTANCE, StringMarshaller.INSTANCE)
.setFullMethodName("test/noResultMethod")
.setType(MethodDescriptor.MethodType.SERVER_STREAMING)
.build();
AndroidComponentAddress serverAddress;
ManagedChannel channel;
@Before
public void setUp() throws Exception {
ServerCallHandler<String, String> callHandler =
ServerCalls.asyncUnaryCall(
(req, respObserver) -> {
respObserver.onNext(req);
respObserver.onCompleted();
});
ServerCallHandler<String, String> singleLargeResultCallHandler =
ServerCalls.asyncUnaryCall(
(req, respObserver) -> {
respObserver.onNext(createLargeString(SLIGHTLY_MORE_THAN_ONE_BLOCK));
respObserver.onCompleted();
});
ServerServiceDefinition serviceDef =
ServerServiceDefinition.builder("test")
.addMethod(method, callHandler)
.addMethod(singleLargeResultMethod, singleLargeResultCallHandler)
.build();
AndroidComponentAddress serverAddress = HostServices.allocateService(appContext);
HostServices.configureService(serverAddress,
HostServices.serviceParamsBuilder()
.setServerFactory((service, receiver) ->
BinderServerBuilder.forService(service, receiver)
.addService(serviceDef)
.build())
.build());
channel = BinderChannelBuilder.forAddress(serverAddress, appContext).build();
}
@After
public void tearDown() throws Exception {
channel.shutdownNow();
HostServices.awaitServiceShutdown();
}
private ListenableFuture<String> doCall(String request) {
return doCall(method, request);
}
private ListenableFuture<String> doCall(
MethodDescriptor<String, String> methodDesc, String request) {
ListenableFuture<String> future =
ClientCalls.futureUnaryCall(channel.newCall(methodDesc, CallOptions.DEFAULT), request);
return Futures.withTimeout(
future, 5L, TimeUnit.SECONDS, Executors.newSingleThreadScheduledExecutor());
}
@Test
public void testBasicCall() throws Exception {
assertThat(doCall("Hello").get()).isEqualTo("Hello");
}
@Test
public void testEmptyMessage() throws Exception {
assertThat(doCall("").get()).isEmpty();
}
@Test
public void test100kString() throws Exception {
String fullMsg = createLargeString(100000);
assertThat(doCall(fullMsg).get()).isEqualTo(fullMsg);
}
@Test
public void testSingleLargeResultCall() throws Exception {
String res = doCall(singleLargeResultMethod, "hello").get();
assertThat(res.length()).isEqualTo(SLIGHTLY_MORE_THAN_ONE_BLOCK);
}
private static String createLargeString(int size) {
StringBuilder sb = new StringBuilder();
while (sb.length() < size) {
sb.append(MSG);
}
sb.setLength(size);
return sb.toString();
}
private static class StringMarshaller implements MethodDescriptor.Marshaller<String> {
public static final StringMarshaller INSTANCE = new StringMarshaller();
@Override
public InputStream stream(String value) {
return new ByteArrayInputStream(value.getBytes(UTF_8));
}
@Override
public String parse(InputStream stream) {
try {
return new String(ByteStreams.toByteArray(stream), UTF_8);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
}

View File

@ -0,0 +1,193 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.binder;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.os.IBinder;
import androidx.test.core.app.ApplicationProvider;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.base.Function;
import com.google.protobuf.Empty;
import io.grpc.CallOptions;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.protobuf.lite.ProtoLiteUtils;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.ServerCalls;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AndroidJUnit4.class)
public final class BinderSecurityTest {
private final Context appContext = ApplicationProvider.getApplicationContext();
String[] serviceNames = new String[] {"foo", "bar", "baz"};
@Nullable ManagedChannel channel;
Map<String, MethodDescriptor<Empty, Empty>> methods = new HashMap<>();
List<MethodDescriptor<Empty, Empty>> calls = new ArrayList<>();
@After
public void tearDown() throws Exception {
if (channel != null) {
channel.shutdownNow();
}
HostServices.awaitServiceShutdown();
}
private void createChannel() throws Exception {
createChannel(SecurityPolicies.serverInternalOnly(), SecurityPolicies.internalOnly());
}
private void createChannel(ServerSecurityPolicy serverPolicy, SecurityPolicy channelPolicy)
throws Exception {
AndroidComponentAddress addr = HostServices.allocateService(appContext);
HostServices.configureService(addr,
HostServices.serviceParamsBuilder()
.setServerFactory((service, receiver) -> buildServer(service, receiver, serverPolicy))
.build());
channel =
BinderChannelBuilder.forAddress(addr, appContext)
.securityPolicy(channelPolicy)
.build();
}
private Server buildServer(
Service service,
IBinderReceiver receiver,
ServerSecurityPolicy serverPolicy) {
BinderServerBuilder serverBuilder = BinderServerBuilder.forService(service, receiver);
serverBuilder.securityPolicy(serverPolicy);
MethodDescriptor.Marshaller<Empty> marshaller =
ProtoLiteUtils.marshaller(Empty.getDefaultInstance());
for (String serviceName : serviceNames) {
ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(serviceName);
for (int i = 0; i < 2; i++) {
// Add two methods to the service.
String name = serviceName + "/method" + i;
MethodDescriptor<Empty, Empty> method =
MethodDescriptor.newBuilder(marshaller, marshaller)
.setFullMethodName(name)
.setType(MethodDescriptor.MethodType.UNARY)
.build();
ServerCallHandler<Empty, Empty> callHandler =
ServerCalls.asyncUnaryCall(
(req, respObserver) -> {
calls.add(method);
respObserver.onNext(req);
respObserver.onCompleted();
});
builder.addMethod(method, callHandler);
methods.put(name, method);
}
serverBuilder.addService(builder.build());
}
return serverBuilder.build();
}
private void assertCallSuccess(MethodDescriptor<Empty, Empty> method) {
assertThat(
ClientCalls.blockingUnaryCall(
channel, method, CallOptions.DEFAULT, Empty.getDefaultInstance()))
.isNotNull();
}
private void assertCallFailure(MethodDescriptor<Empty, Empty> method, Status status) {
try {
ClientCalls.blockingUnaryCall(channel, method, CallOptions.DEFAULT, null);
fail();
} catch (StatusRuntimeException sre) {
assertThat(sre.getStatus().getCode()).isEqualTo(status.getCode());
}
}
@Test
public void testAllowedCall() throws Exception {
createChannel();
for (MethodDescriptor<Empty, Empty> method : methods.values()) {
assertCallSuccess(method);
}
}
@Test
public void testServerDisllowsCalls() throws Exception {
createChannel(
ServerSecurityPolicy.newBuilder()
.servicePolicy("foo", policy((uid) -> false))
.servicePolicy("bar", policy((uid) -> false))
.servicePolicy("baz", policy((uid) -> false))
.build(),
SecurityPolicies.internalOnly());
for (MethodDescriptor<Empty, Empty> method : methods.values()) {
assertCallFailure(method, Status.PERMISSION_DENIED);
}
}
@Test
public void testClientDoesntTrustServer() throws Exception {
createChannel(SecurityPolicies.serverInternalOnly(), policy((uid) -> false));
for (MethodDescriptor<Empty, Empty> method : methods.values()) {
assertCallFailure(method, Status.PERMISSION_DENIED);
}
}
@Test
public void testPerServicePolicy() throws Exception {
createChannel(
ServerSecurityPolicy.newBuilder()
.servicePolicy("foo", policy((uid) -> true))
.servicePolicy("bar", policy((uid) -> false))
.build(),
SecurityPolicies.internalOnly());
for (MethodDescriptor<Empty, Empty> method : methods.values()) {
if (method.getServiceName().equals("bar")) {
assertCallFailure(method, Status.PERMISSION_DENIED);
} else {
assertCallSuccess(method);
}
}
}
private static SecurityPolicy policy(Function<Integer, Boolean> func) {
return new SecurityPolicy() {
@Override
public Status checkAuthorization(int uid) {
return func.apply(uid) ? Status.OK : Status.PERMISSION_DENIED;
}
};
}
}

View File

@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.SECONDS;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.os.Binder;
@ -31,10 +32,12 @@ import com.google.auto.value.AutoValue;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import io.grpc.NameResolver;
import io.grpc.Server;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
import io.grpc.binder.AndroidComponentAddress;
import io.grpc.internal.InternalServer;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -61,6 +64,11 @@ public final class HostServices {
HostService1.class, HostService2.class,
};
public interface ServerFactory {
Server createServer(Service service, IBinderReceiver receiver);
}
@AutoValue
public abstract static class ServiceParams {
@Nullable
@ -69,6 +77,9 @@ public final class HostServices {
@Nullable
abstract Supplier<IBinder> rawBinderSupplier();
@Nullable
abstract ServerFactory serverFactory();
public abstract Builder toBuilder();
public static Builder builder() {
@ -78,6 +89,9 @@ public final class HostServices {
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setRawBinderSupplier(Supplier<IBinder> binderSupplier);
public abstract Builder setServerFactory(ServerFactory serverFactory);
/**
* If set, this executor will be used to pass any inbound transactions to the server. This can
* be used to simulate delayed, re-ordered, or dropped packets.
@ -185,6 +199,7 @@ public final class HostServices {
@Nullable private ServiceParams params;
@Nullable private Supplier<IBinder> binderSupplier;
@Nullable private Server server;
@Override
public final void onCreate() {
@ -195,7 +210,22 @@ public final class HostServices {
activeServices.put(cls, this);
checkState(serviceParams.containsKey(cls));
params = serviceParams.get(cls);
ServerFactory factory = params.serverFactory();
if (factory != null) {
IBinderReceiver receiver = new IBinderReceiver();
server = factory.createServer(this, receiver);
try {
server.start();
} catch (IOException ioe) {
throw new AssertionError("Failed to start server", ioe);
}
binderSupplier = () -> receiver.get();
} else {
binderSupplier = params.rawBinderSupplier();
if (binderSupplier == null) {
throw new AssertionError("Insufficient params for host service");
}
}
}
}
@ -217,6 +247,10 @@ public final class HostServices {
@Override
public final void onDestroy() {
synchronized (HostServices.class) {
if (server != null) {
server.shutdown();
server = null;
}
HostService removed = activeServices.remove(getClass());
checkState(removed == this);
serviceAddresses.remove(getClass());

View File

@ -0,0 +1,338 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.binder.internal;
import static com.google.common.truth.Truth.assertThat;
import android.app.Service;
import android.content.Context;
import android.content.Intent;
import android.os.IBinder;
import android.os.Parcel;
import androidx.core.content.ContextCompat;
import androidx.test.core.app.ApplicationProvider;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.common.util.concurrent.testing.TestingExecutors;
import com.google.protobuf.Empty;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.binder.AndroidComponentAddress;
import io.grpc.binder.BinderServerBuilder;
import io.grpc.binder.BindServiceFlags;
import io.grpc.binder.HostServices;
import io.grpc.binder.IBinderReceiver;
import io.grpc.binder.InboundParcelablePolicy;
import io.grpc.binder.SecurityPolicies;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.StreamListener;
import io.grpc.protobuf.lite.ProtoLiteUtils;
import io.grpc.stub.ServerCalls;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
/**
* Client-side transport tests for binder channel. Like BinderChannelSmokeTest, this covers edge
* cases not exercised by AbstractTransportTest, but in this case we're dealing with rare ordering
* issues at the transport level, so we use a BinderTransport.BinderClientTransport directly, rather
* than a channel.
*/
@RunWith(AndroidJUnit4.class)
public final class BinderClientTransportTest {
private final Context appContext = ApplicationProvider.getApplicationContext();
MethodDescriptor.Marshaller<Empty> marshaller =
ProtoLiteUtils.marshaller(Empty.getDefaultInstance());
MethodDescriptor<Empty, Empty> methodDesc =
MethodDescriptor.newBuilder(marshaller, marshaller)
.setFullMethodName("test/method")
.setType(MethodDescriptor.MethodType.UNARY)
.build();
MethodDescriptor<Empty, Empty> streamingMethodDesc =
MethodDescriptor.newBuilder(marshaller, marshaller)
.setFullMethodName("test/methodServerStreaming")
.setType(MethodDescriptor.MethodType.SERVER_STREAMING)
.build();
AndroidComponentAddress serverAddress;
BinderTransport.BinderClientTransport transport;
private final ObjectPool<ScheduledExecutorService> executorServicePool =
new FixedObjectPool<>(TestingExecutors.sameThreadScheduledExecutor());
private final TestTransportListener transportListener = new TestTransportListener();
private final TestStreamListener streamListener = new TestStreamListener();
private int serverCallsCompleted;
@Before
public void setUp() throws Exception {
ServerCallHandler<Empty, Empty> callHandler =
ServerCalls.asyncUnaryCall(
(req, respObserver) -> {
respObserver.onNext(req);
respObserver.onCompleted();
serverCallsCompleted += 1;
});
ServerCallHandler<Empty, Empty> streamingCallHandler =
ServerCalls.asyncUnaryCall(
(req, respObserver) -> {
for (int i = 0; i < 100; i++) {
respObserver.onNext(req);
}
respObserver.onCompleted();
serverCallsCompleted += 1;
});
ServerServiceDefinition serviceDef =
ServerServiceDefinition.builder("test")
.addMethod(methodDesc, callHandler)
.addMethod(streamingMethodDesc, streamingCallHandler)
.build();
serverAddress = HostServices.allocateService(appContext);
HostServices.configureService(serverAddress,
HostServices.serviceParamsBuilder()
.setServerFactory((service, receiver) ->
BinderServerBuilder.forService(service, receiver)
.addService(serviceDef)
.build())
.build());
transport =
new BinderTransport.BinderClientTransport(
appContext,
serverAddress,
BindServiceFlags.DEFAULTS,
ContextCompat.getMainExecutor(appContext),
executorServicePool,
executorServicePool,
SecurityPolicies.internalOnly(),
InboundParcelablePolicy.DEFAULT,
Attributes.EMPTY);
Runnable r = transport.start(transportListener);
r.run();
transportListener.awaitReady();
}
@After
public void tearDown() throws Exception {
transport.shutdownNow(Status.OK);
HostServices.awaitServiceShutdown();
}
@Test
public void testShutdownBeforeStreamStart_b153326034() throws Exception {
ClientStream stream = transport.newStream(methodDesc, new Metadata(), CallOptions.DEFAULT);
transport.shutdownNow(Status.UNKNOWN.withDescription("reasons"));
// This shouldn't throw an exception.
stream.start(streamListener);
}
@Test
public void testRequestWhileStreamIsWaitingOnCall_b154088869() throws Exception {
ClientStream stream =
transport.newStream(streamingMethodDesc, new Metadata(), CallOptions.DEFAULT);
stream.start(streamListener);
stream.writeMessage(marshaller.stream(Empty.getDefaultInstance()));
stream.halfClose();
stream.request(3);
streamListener.awaitMessages();
streamListener.messageProducer.next();
streamListener.messageProducer.next();
// Without the fix, this loops forever.
stream.request(2);
}
@Test
public void testTransactionForDiscardedCall_b155244043() throws Exception {
ClientStream stream =
transport.newStream(streamingMethodDesc, new Metadata(), CallOptions.DEFAULT);
stream.start(streamListener);
stream.writeMessage(marshaller.stream(Empty.getDefaultInstance()));
assertThat(transport.getOngoingCalls()).hasSize(1);
int callId = transport.getOngoingCalls().keySet().iterator().next();
stream.cancel(Status.UNKNOWN);
// Send a transaction to the no-longer present call ID. It should be silently ignored.
Parcel p = Parcel.obtain();
transport.handleTransaction(callId, p);
p.recycle();
}
@Test
public void testBadTransactionStreamThroughput_b163053382() throws Exception {
ClientStream stream =
transport.newStream(streamingMethodDesc, new Metadata(), CallOptions.DEFAULT);
stream.start(streamListener);
stream.writeMessage(marshaller.stream(Empty.getDefaultInstance()));
stream.halfClose();
stream.request(1000);
// Wait until we receive the first message.
streamListener.awaitMessages();
// Wait until the server actually provides all messages and completes the call.
awaitServerCallsCompleted(1);
// Now we should be able to receive all messages on a single message producer.
assertThat(streamListener.drainMessages()).isEqualTo(100);
}
@Test
public void testMessageProducerClosedAfterStream_b169313545() {
ClientStream stream =
transport.newStream(methodDesc, new Metadata(), CallOptions.DEFAULT);
stream.start(streamListener);
stream.writeMessage(marshaller.stream(Empty.getDefaultInstance()));
stream.halfClose();
stream.request(2);
// Wait until we receive the first message.
streamListener.awaitMessages();
// Now cancel the stream, forcing it to close.
stream.cancel(Status.CANCELLED);
// The message producer shouldn't throw an exception if we drain it now.
streamListener.drainMessages();
}
private synchronized void awaitServerCallsCompleted(int calls) {
while (serverCallsCompleted < calls) {
try {
wait(100);
} catch (InterruptedException inte) {
throw new AssertionError("Interrupted waiting for servercalls");
}
}
}
private static final class TestTransportListener implements ManagedClientTransport.Listener {
public boolean ready;
public boolean inUse;
@Nullable public Status shutdownStatus;
public boolean terminated;
@Override
public void transportShutdown(Status shutdownStatus) {
this.shutdownStatus = shutdownStatus;
}
@Override
public void transportTerminated() {
terminated = true;
}
@Override
public synchronized void transportReady() {
ready = true;
notify();
}
public synchronized void awaitReady() {
while (!ready) {
try {
wait(100);
} catch (InterruptedException inte) {
throw new AssertionError("Interrupted waiting for ready");
}
}
}
@Override
public void transportInUse(boolean inUse) {
this.inUse = inUse;
}
}
private static final class TestStreamListener implements ClientStreamListener {
public StreamListener.MessageProducer messageProducer;
public boolean ready;
public Metadata headers;
@Nullable public Status closedStatus;
@Override
public void messagesAvailable(StreamListener.MessageProducer messageProducer) {
this.messageProducer = messageProducer;
}
public synchronized void awaitMessages() {
while (messageProducer == null) {
try {
wait(100);
} catch (InterruptedException inte) {
throw new AssertionError("Interrupted waiting for messages");
}
}
}
public int drainMessages() {
int n = 0;
while (messageProducer.next() != null) {
n += 1;
}
return n;
}
@Override
public void onReady() {
ready = true;
}
@Override
public void headersRead(Metadata headers) {
this.headers = headers;
}
@Override
public void closed(Status status, Metadata trailers) {
this.closedStatus = status;
}
@Override
public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
this.closedStatus = status;
}
}
}

View File

@ -0,0 +1,264 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.binder;
import static com.google.common.base.Preconditions.checkNotNull;
import android.app.Application;
import android.content.ComponentName;
import android.content.Context;
import androidx.core.content.ContextCompat;
import com.google.errorprone.annotations.DoNotCall;
import io.grpc.ChannelCredentials;
import io.grpc.ChannelLogger;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingChannelBuilder;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.binder.internal.BinderTransport;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ManagedChannelImplBuilder;
import io.grpc.internal.ManagedChannelImplBuilder.ClientTransportFactoryBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* Builder for a gRPC channel which communicates with an Android bound service.
*
* @see <a href="https://developer.android.com/guide/components/bound-services.html">Bound
* Services</a>
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8022")
public final class BinderChannelBuilder
extends ForwardingChannelBuilder<BinderChannelBuilder> {
/**
* Creates a channel builder that will bind to a remote Android service.
*
* <p>The underlying Android binding will be torn down when the channel becomes idle. This happens
* after 30 minutes without use by default but can be configured via {@link
* ManagedChannelBuilder#idleTimeout(long, TimeUnit)} or triggered manually with {@link
* ManagedChannel#enterIdle()}.
*
* <p>You the caller are responsible for managing the lifecycle of any channels built by the
* resulting builder. They will not be shut down automatically.
*
* @param targetAddress the {@link AndroidComponentAddress} referencing the service to bind to.
* @param sourceContext the context to bind from (e.g. The current Activity or Application).
* @return a new builder
*/
public static BinderChannelBuilder forAddress(
AndroidComponentAddress targetAddress, Context sourceContext) {
return new BinderChannelBuilder(targetAddress, sourceContext);
}
/**
* Always fails. Call {@link #forAddress(AndroidComponentAddress, Context)} instead.
*/
@DoNotCall("Unsupported. Use forAddress(AndroidComponentAddress, Context) instead")
public static BinderChannelBuilder forAddress(String name, int port) {
throw new UnsupportedOperationException(
"call forAddress(AndroidComponentAddress, Context) instead");
}
/**
* Always fails. Call {@link #forAddress(AndroidComponentAddress, Context)} instead.
*/
@DoNotCall("Unsupported. Use forAddress(AndroidComponentAddress, Context) instead")
public static BinderChannelBuilder forTarget(String target) {
throw new UnsupportedOperationException(
"call forAddress(AndroidComponentAddress, Context) instead");
}
private final ManagedChannelImplBuilder managedChannelImplBuilder;
private Executor mainThreadExecutor;
private ObjectPool<ScheduledExecutorService> schedulerPool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
private SecurityPolicy securityPolicy;
private InboundParcelablePolicy inboundParcelablePolicy;
private BindServiceFlags bindServiceFlags;
private BinderChannelBuilder(
AndroidComponentAddress targetAddress,
Context sourceContext) {
mainThreadExecutor = ContextCompat.getMainExecutor(sourceContext);
securityPolicy = SecurityPolicies.internalOnly();
inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT;
bindServiceFlags = BindServiceFlags.DEFAULTS;
final class BinderChannelTransportFactoryBuilder
implements ClientTransportFactoryBuilder {
@Override
public ClientTransportFactory buildClientTransportFactory() {
return new TransportFactory(
sourceContext,
mainThreadExecutor,
schedulerPool,
managedChannelImplBuilder.getOffloadExecutorPool(),
securityPolicy,
bindServiceFlags,
inboundParcelablePolicy);
}
}
managedChannelImplBuilder =
new ManagedChannelImplBuilder(
targetAddress,
targetAddress.getAuthority(),
new BinderChannelTransportFactoryBuilder(),
null);
}
@Override
protected ManagedChannelBuilder<?> delegate() {
return managedChannelImplBuilder;
}
/** Specifies certain optional aspects of the underlying Android Service binding. */
public BinderChannelBuilder setBindServiceFlags(BindServiceFlags bindServiceFlags) {
this.bindServiceFlags = bindServiceFlags;
return this;
}
/**
* Provides a custom scheduled executor service.
*
* <p>This is an optional parameter. If the user has not provided a scheduled executor service
* when the channel is built, the builder will use a static cached thread pool.
*
* @return this
*/
public BinderChannelBuilder scheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
schedulerPool =
new FixedObjectPool<>(checkNotNull(scheduledExecutorService, "scheduledExecutorService"));
return this;
}
/**
* Provides a custom {@link Executor} for accessing this application's main thread.
*
* <p>Optional. A default implementation will be used if no custom Executor is provided.
*
* @return this
*/
public BinderChannelBuilder mainThreadExecutor(Executor mainThreadExecutor) {
this.mainThreadExecutor = mainThreadExecutor;
return this;
}
/**
* Provides a custom security policy.
*
* <p>This is optional. If the user has not provided a security policy, this channel will only
* communicate with the same application UID.
*
* @return this
*/
public BinderChannelBuilder securityPolicy(SecurityPolicy securityPolicy) {
this.securityPolicy = checkNotNull(securityPolicy, "securityPolicy");
return this;
}
/** Sets the policy for inbound parcelable objects. */
public BinderChannelBuilder inboundParcelablePolicy(
InboundParcelablePolicy inboundParcelablePolicy) {
this.inboundParcelablePolicy = checkNotNull(inboundParcelablePolicy, "inboundParcelablePolicy");
return this;
}
/** Creates new binder transports. */
private static final class TransportFactory implements ClientTransportFactory {
private final Context sourceContext;
private final Executor mainThreadExecutor;
private final ObjectPool<ScheduledExecutorService> scheduledExecutorPool;
private final ObjectPool<? extends Executor> offloadExecutorPool;
private final SecurityPolicy securityPolicy;
private final InboundParcelablePolicy inboundParcelablePolicy;
private final BindServiceFlags bindServiceFlags;
private ScheduledExecutorService executorService;
private Executor offloadExecutor;
private boolean closed;
TransportFactory(
Context sourceContext,
Executor mainThreadExecutor,
ObjectPool<ScheduledExecutorService> scheduledExecutorPool,
ObjectPool<? extends Executor> offloadExecutorPool,
SecurityPolicy securityPolicy,
BindServiceFlags bindServiceFlags,
InboundParcelablePolicy inboundParcelablePolicy) {
this.sourceContext = sourceContext;
this.mainThreadExecutor = mainThreadExecutor;
this.scheduledExecutorPool = scheduledExecutorPool;
this.offloadExecutorPool = offloadExecutorPool;
this.securityPolicy = securityPolicy;
this.bindServiceFlags = bindServiceFlags;
this.inboundParcelablePolicy = inboundParcelablePolicy;
executorService = scheduledExecutorPool.getObject();
offloadExecutor = offloadExecutorPool.getObject();
}
@Override
public ConnectionClientTransport newClientTransport(
SocketAddress addr, ClientTransportOptions options, ChannelLogger channelLogger) {
if (closed) {
throw new IllegalStateException("The transport factory is closed.");
}
return new BinderTransport.BinderClientTransport(
sourceContext,
(AndroidComponentAddress) addr,
bindServiceFlags,
mainThreadExecutor,
scheduledExecutorPool,
offloadExecutorPool,
securityPolicy,
inboundParcelablePolicy,
options.getEagAttributes());
}
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return executorService;
}
@Override
public SwapChannelCredentialsResult swapChannelCredentials(ChannelCredentials channelCreds) {
return null;
}
@Override
public void close() {
closed = true;
executorService = scheduledExecutorPool.returnObject(executorService);
offloadExecutor = offloadExecutorPool.returnObject(offloadExecutor);
}
}
}

View File

@ -0,0 +1,178 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.binder;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import android.app.Service;
import android.os.IBinder;
import com.google.common.base.Supplier;
import com.google.errorprone.annotations.DoNotCall;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ExperimentalApi;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerStreamTracer;
import io.grpc.binder.internal.BinderServer;
import io.grpc.binder.internal.BinderTransportSecurity;
import io.grpc.ForwardingServerBuilder;
import io.grpc.internal.FixedObjectPool;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ServerImplBuilder;
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
/**
* Builder for a server that services requests from an Android Service.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8022")
public final class BinderServerBuilder
extends ForwardingServerBuilder<BinderServerBuilder> {
/**
* Creates a server builder that will bind with the given name.
*
* <p>The listening {@link IBinder} associated with new {@link Server}s will be stored in {@code
* binderReceiver} upon {@link #build()}.
*
* @param service the concrete Android Service that will host this server.
* @param receiver an "out param" for the new {@link Server}'s listening {@link IBinder}
* @return a new builder
*/
public static BinderServerBuilder forService(Service service, IBinderReceiver receiver) {
return new BinderServerBuilder(service, receiver);
}
/**
* Always fails. Call {@link #forService(Service, IBinderReceiver)} instead.
*/
@DoNotCall("Unsupported. Use forService() instead")
public static BinderServerBuilder forPort(int port) {
throw new UnsupportedOperationException("call forService() instead");
}
private final ServerImplBuilder serverImplBuilder;
private ObjectPool<ScheduledExecutorService> schedulerPool =
SharedResourcePool.forResource(GrpcUtil.TIMER_SERVICE);
private ServerSecurityPolicy securityPolicy;
private InboundParcelablePolicy inboundParcelablePolicy;
private BinderServerBuilder(Service service, IBinderReceiver binderReceiver) {
securityPolicy = SecurityPolicies.serverInternalOnly();
inboundParcelablePolicy = InboundParcelablePolicy.DEFAULT;
serverImplBuilder = new ServerImplBuilder(streamTracerFactories -> {
BinderServer server = new BinderServer(
AndroidComponentAddress.forContext(service),
schedulerPool,
streamTracerFactories,
securityPolicy,
inboundParcelablePolicy);
binderReceiver.set(server.getHostBinder());
return server;
});
// Disable compression by default, since there's little benefit when all communication is
// on-device, and it means sending supported-encoding headers with every call.
decompressorRegistry(DecompressorRegistry.emptyInstance());
compressorRegistry(CompressorRegistry.newEmptyInstance());
// Disable stats and tracing by default.
serverImplBuilder.setStatsEnabled(false);
serverImplBuilder.setTracingEnabled(false);
BinderTransportSecurity.installAuthInterceptor(this);
}
@Override
protected ServerBuilder<?> delegate() {
return serverImplBuilder;
}
/** Enable stats collection using census. */
public BinderServerBuilder enableStats() {
serverImplBuilder.setStatsEnabled(true);
return this;
}
/** Enable tracing using census. */
public BinderServerBuilder enableTracing() {
serverImplBuilder.setTracingEnabled(true);
return this;
}
/**
* Provides a custom scheduled executor service.
*
* <p>It's an optional parameter. If the user has not provided a scheduled executor service when
* the channel is built, the builder will use a static cached thread pool.
*
* @return this
*/
public BinderServerBuilder scheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
schedulerPool =
new FixedObjectPool<>(checkNotNull(scheduledExecutorService, "scheduledExecutorService"));
return this;
}
/**
* Provides a custom security policy.
*
* <p>This is optional. If the user has not provided a security policy, the server will default to
* only accepting calls from the same application UID.
*
* @return this
*/
public BinderServerBuilder securityPolicy(ServerSecurityPolicy securityPolicy) {
this.securityPolicy = checkNotNull(securityPolicy, "securityPolicy");
return this;
}
/** Sets the policy for inbound parcelable objects. */
public BinderServerBuilder inboundParcelablePolicy(
InboundParcelablePolicy inboundParcelablePolicy) {
this.inboundParcelablePolicy = checkNotNull(inboundParcelablePolicy, "inboundParcelablePolicy");
return this;
}
@Override
public BinderServerBuilder useTransportSecurity(File certChain, File privateKey) {
throw new UnsupportedOperationException("TLS not supported in BinderServer");
}
/**
* Builds a {@link Server} according to this builder's parameters and stores its listening {@link
* IBinder} in the {@link IBinderReceiver} passed to {@link #forService(Service,
* IBinderReceiver)}.
*
* @return the new Server
*/
@Override // For javadoc refinement only.
public Server build() {
return super.build();
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.binder;
import android.os.IBinder;
import io.grpc.ExperimentalApi;
import javax.annotation.Nullable;
/** A container for at most one instance of {@link IBinder}, useful as an "out parameter". */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8022")
public final class IBinderReceiver {
@Nullable private IBinder value;
/** Constructs a new, initially empty, container. */
public IBinderReceiver() {}
/** Returns the contents of this container or null if it is empty. */
@Nullable
public synchronized IBinder get() {
return value;
}
public synchronized void set(IBinder value) {
this.value = value;
}
}

View File

@ -52,7 +52,7 @@ import javax.annotation.concurrent.ThreadSafe;
* https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md
*/
@ThreadSafe
final class BinderServer implements InternalServer, LeakSafeOneWayBinder.TransactionHandler {
public final class BinderServer implements InternalServer, LeakSafeOneWayBinder.TransactionHandler {
private final ObjectPool<ScheduledExecutorService> executorServicePool;
private final ImmutableList<ServerStreamTracer.Factory> streamTracerFactories;
@ -70,7 +70,7 @@ final class BinderServer implements InternalServer, LeakSafeOneWayBinder.Transac
@GuardedBy("this")
private boolean shutdown;
BinderServer(
public BinderServer(
AndroidComponentAddress listenAddress,
ObjectPool<ScheduledExecutorService> executorServicePool,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,

View File

@ -96,7 +96,7 @@ import javax.annotation.concurrent.ThreadSafe;
* https://github.com/grpc/proposal/blob/master/L73-java-binderchannel/wireformat.md
*/
@ThreadSafe
abstract class BinderTransport
public abstract class BinderTransport
implements LeakSafeOneWayBinder.TransactionHandler, IBinder.DeathRecipient {
private static final Logger logger = Logger.getLogger(BinderTransport.class.getName());
@ -544,7 +544,7 @@ abstract class BinderTransport
/** Concrete client-side transport implementation. */
@ThreadSafe
static final class BinderClientTransport extends BinderTransport
public static final class BinderClientTransport extends BinderTransport
implements ConnectionClientTransport, Bindable.Observer {
private final ObjectPool<? extends Executor> offloadExecutorPool;
@ -561,7 +561,7 @@ abstract class BinderTransport
@GuardedBy("this")
private int latestCallId = FIRST_CALL_ID;
BinderClientTransport(
public BinderClientTransport(
Context sourceContext,
AndroidComponentAddress targetAddress,
BindServiceFlags bindServiceFlags,

View File

@ -36,7 +36,7 @@ import javax.annotation.CheckReturnValue;
* <p>Attaches authorization state to a newly-created transport, and contains a
* ServerInterceptor which ensures calls are authorized before allowing them to proceed.
*/
final class BinderTransportSecurity {
public final class BinderTransportSecurity {
private static final Attributes.Key<TransportAuthorizationState> TRANSPORT_AUTHORIZATION_STATE =
Attributes.Key.create("transport-authorization-state");
@ -48,7 +48,7 @@ final class BinderTransportSecurity {
*
* @param serverBuilder The ServerBuilder being used to create the server.
*/
static void installAuthInterceptor(ServerBuilder<?> serverBuilder) {
public static void installAuthInterceptor(ServerBuilder<?> serverBuilder) {
serverBuilder.intercept(new ServerAuthInterceptor());
}