mirror of https://github.com/grpc/grpc-java.git
Introduce the AsyncSecurityPolicy class. (#10622)
This is the async variant of SecurityPolicy, allowing callers to implement security checks based on slow calls that aren't meant to block the gRPC thread. BinderTransportSecurity.checkAuthorization **STILL** blocks while attempting to resolve the ListenableFuture<Status> it gets from the policy object. That will still be adressed in a follow-up. Relate issue: #10566
This commit is contained in:
parent
cd810c5284
commit
b6947de95a
|
|
@ -23,6 +23,9 @@ import android.content.Context;
|
|||
import androidx.test.core.app.ApplicationProvider;
|
||||
import androidx.test.ext.junit.runners.AndroidJUnit4;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.Empty;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.ManagedChannel;
|
||||
|
|
@ -155,7 +158,7 @@ public final class BinderSecurityTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testServerDisllowsCalls() throws Exception {
|
||||
public void testServerDisallowsCalls() throws Exception {
|
||||
createChannel(
|
||||
ServerSecurityPolicy.newBuilder()
|
||||
.servicePolicy("foo", policy((uid) -> false))
|
||||
|
|
@ -197,6 +200,25 @@ public final class BinderSecurityTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerServicePolicyAsync() throws Exception {
|
||||
createChannel(
|
||||
ServerSecurityPolicy.newBuilder()
|
||||
.servicePolicy("foo", asyncPolicy((uid) -> Futures.immediateFuture(true)))
|
||||
.servicePolicy("bar", asyncPolicy((uid) -> Futures.immediateFuture(false)))
|
||||
.build(),
|
||||
SecurityPolicies.internalOnly());
|
||||
|
||||
assertThat(methods).isNotEmpty();
|
||||
for (MethodDescriptor<Empty, Empty> method : methods.values()) {
|
||||
if (method.getServiceName().equals("bar")) {
|
||||
assertCallFailure(method, Status.PERMISSION_DENIED);
|
||||
} else {
|
||||
assertCallSuccess(method);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecurityInterceptorIsClosestToTransport() throws Exception {
|
||||
createChannel(
|
||||
|
|
@ -227,6 +249,20 @@ public final class BinderSecurityTest {
|
|||
};
|
||||
}
|
||||
|
||||
private static AsyncSecurityPolicy asyncPolicy(
|
||||
Function<Integer, ListenableFuture<Boolean>> func) {
|
||||
return new AsyncSecurityPolicy() {
|
||||
@Override
|
||||
public ListenableFuture<Status> checkAuthorizationAsync(int uid) {
|
||||
return Futures
|
||||
.transform(
|
||||
func.apply(uid),
|
||||
allowed -> allowed ? Status.OK : Status.PERMISSION_DENIED,
|
||||
MoreExecutors.directExecutor());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private final class CountingServerInterceptor implements ServerInterceptor {
|
||||
int numInterceptedCalls;
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,71 @@
|
|||
/*
|
||||
* Copyright 2023 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.binder;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import io.grpc.ExperimentalApi;
|
||||
import io.grpc.Status;
|
||||
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import javax.annotation.CheckReturnValue;
|
||||
|
||||
/**
|
||||
* Decides whether a given Android UID is authorized to access some resource.
|
||||
*
|
||||
* <p>This class provides the asynchronous version of {@link SecurityPolicy}, allowing
|
||||
* implementations of authorization logic that involves slow or asynchronous calls without
|
||||
* necessarily blocking the calling thread.
|
||||
*
|
||||
* @see SecurityPolicy
|
||||
*/
|
||||
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10566")
|
||||
@CheckReturnValue
|
||||
public abstract class AsyncSecurityPolicy extends SecurityPolicy {
|
||||
|
||||
/**
|
||||
* @deprecated Prefer {@link #checkAuthorizationAsync(int)} for async or slow calls or subclass
|
||||
* {@link SecurityPolicy} directly for quick, synchronous implementations.
|
||||
*/
|
||||
@Override
|
||||
@Deprecated
|
||||
public final Status checkAuthorization(int uid) {
|
||||
try {
|
||||
return checkAuthorizationAsync(uid).get();
|
||||
} catch (ExecutionException e) {
|
||||
return Status.fromThrowable(e);
|
||||
} catch (CancellationException e) {
|
||||
return Status.CANCELLED.withCause(e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt(); // re-set the current thread's interruption state
|
||||
return Status.CANCELLED.withCause(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decides whether the given Android UID is authorized. (Validity is implementation dependent).
|
||||
*
|
||||
* <p>As long as any given UID has active processes, this method should return the same value for
|
||||
* that UID. In order words, policy changes which occur while a transport instance is active, will
|
||||
* have no effect on that transport instance.
|
||||
*
|
||||
* @param uid The Android UID to authenticate.
|
||||
* @return A {@link ListenableFuture} for a gRPC {@link Status} object, with OK indicating
|
||||
* authorized.
|
||||
*/
|
||||
abstract ListenableFuture<Status> checkAuthorizationAsync(int uid);
|
||||
}
|
||||
|
|
@ -72,6 +72,10 @@ public final class ServerSecurityPolicy {
|
|||
@CheckReturnValue
|
||||
ListenableFuture<Status> checkAuthorizationForServiceAsync(int uid, String serviceName) {
|
||||
SecurityPolicy securityPolicy = perServicePolicies.getOrDefault(serviceName, defaultPolicy);
|
||||
if (securityPolicy instanceof AsyncSecurityPolicy) {
|
||||
return ((AsyncSecurityPolicy) securityPolicy).checkAuthorizationAsync(uid);
|
||||
}
|
||||
|
||||
try {
|
||||
Status status = securityPolicy.checkAuthorization(uid);
|
||||
return Futures.immediateFuture(status);
|
||||
|
|
|
|||
|
|
@ -18,12 +18,22 @@ package io.grpc.binder;
|
|||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import android.os.Process;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusException;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.robolectric.RobolectricTestRunner;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
@RunWith(RobolectricTestRunner.class)
|
||||
public final class ServerSecurityPolicyTest {
|
||||
|
|
@ -81,6 +91,86 @@ public final class ServerSecurityPolicyTest {
|
|||
.isEqualTo(Status.OK.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerServiceAsync() {
|
||||
policy =
|
||||
ServerSecurityPolicy.newBuilder()
|
||||
.servicePolicy(SERVICE2, asyncPolicy(uid -> {
|
||||
// Add some extra future transformation to confirm that a chain
|
||||
// of futures gets properly handled.
|
||||
ListenableFuture<Void> dependency = Futures.immediateVoidFuture();
|
||||
return Futures
|
||||
.transform(dependency, unused -> Status.OK, MoreExecutors.directExecutor());
|
||||
}))
|
||||
.build();
|
||||
|
||||
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
|
||||
.isEqualTo(Status.OK.getCode());
|
||||
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE1).getCode())
|
||||
.isEqualTo(Status.PERMISSION_DENIED.getCode());
|
||||
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE2).getCode())
|
||||
.isEqualTo(Status.OK.getCode());
|
||||
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE2).getCode())
|
||||
.isEqualTo(Status.OK.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerService_throwingExceptionAsynchronously_propagatesStatusFromException() {
|
||||
policy =
|
||||
ServerSecurityPolicy.newBuilder()
|
||||
.servicePolicy(SERVICE1, asyncPolicy(uid ->
|
||||
Futures
|
||||
.immediateFailedFuture(
|
||||
new StatusException(Status.fromCode(Status.Code.ALREADY_EXISTS)))
|
||||
))
|
||||
.build();
|
||||
|
||||
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
|
||||
.isEqualTo(Status.ALREADY_EXISTS.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerServiceAsync_cancelledFuture_propagatesStatus() {
|
||||
policy =
|
||||
ServerSecurityPolicy.newBuilder()
|
||||
.servicePolicy(SERVICE1, asyncPolicy(unused -> Futures.immediateCancelledFuture()))
|
||||
.build();
|
||||
|
||||
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
|
||||
.isEqualTo(Status.CANCELLED.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerServiceAsync_interrupted_cancelledStatus() {
|
||||
ListeningExecutorService listeningExecutorService =
|
||||
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
|
||||
CountDownLatch unsatisfiedLatch = new CountDownLatch(1);
|
||||
ListenableFuture<Status> toBeInterruptedFuture = listeningExecutorService.submit(() -> {
|
||||
unsatisfiedLatch.await(); // waits forever
|
||||
return null;
|
||||
});
|
||||
|
||||
CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
Thread testThread = Thread.currentThread();
|
||||
new Thread(() -> {
|
||||
awaitOrFail(barrier);
|
||||
testThread.interrupt();
|
||||
}).start();
|
||||
|
||||
policy =
|
||||
ServerSecurityPolicy.newBuilder()
|
||||
.servicePolicy(SERVICE1, asyncPolicy(unused -> {
|
||||
awaitOrFail(barrier);
|
||||
return toBeInterruptedFuture;
|
||||
}))
|
||||
.build();
|
||||
|
||||
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
|
||||
.isEqualTo(Status.CANCELLED.getCode());
|
||||
assertThat(Thread.currentThread().isInterrupted()).isTrue();
|
||||
listeningExecutorService.shutdownNow();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerServiceNoDefault() {
|
||||
policy =
|
||||
|
|
@ -109,6 +199,49 @@ public final class ServerSecurityPolicyTest {
|
|||
.isEqualTo(Status.PERMISSION_DENIED.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerServiceNoDefaultAsync() {
|
||||
policy =
|
||||
ServerSecurityPolicy.newBuilder()
|
||||
.servicePolicy(
|
||||
SERVICE1,
|
||||
asyncPolicy((uid) -> Futures.immediateFuture(Status.INTERNAL)))
|
||||
.servicePolicy(
|
||||
SERVICE2, asyncPolicy((uid) -> {
|
||||
// Add some extra future transformation to confirm that a chain
|
||||
// of futures gets properly handled.
|
||||
ListenableFuture<Boolean> anotherUidFuture =
|
||||
Futures.immediateFuture(uid == OTHER_UID);
|
||||
return Futures
|
||||
.transform(
|
||||
anotherUidFuture,
|
||||
anotherUid ->
|
||||
anotherUid
|
||||
? Status.OK
|
||||
: Status.PERMISSION_DENIED,
|
||||
MoreExecutors.directExecutor());
|
||||
}))
|
||||
.build();
|
||||
|
||||
// Uses the specified policy for service1.
|
||||
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE1).getCode())
|
||||
.isEqualTo(Status.INTERNAL.getCode());
|
||||
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE1).getCode())
|
||||
.isEqualTo(Status.INTERNAL.getCode());
|
||||
|
||||
// Uses the specified policy for service2.
|
||||
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE2).getCode())
|
||||
.isEqualTo(Status.PERMISSION_DENIED.getCode());
|
||||
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE2).getCode())
|
||||
.isEqualTo(Status.OK.getCode());
|
||||
|
||||
// Falls back to the default.
|
||||
assertThat(policy.checkAuthorizationForService(MY_UID, SERVICE3).getCode())
|
||||
.isEqualTo(Status.OK.getCode());
|
||||
assertThat(policy.checkAuthorizationForService(OTHER_UID, SERVICE3).getCode())
|
||||
.isEqualTo(Status.PERMISSION_DENIED.getCode());
|
||||
}
|
||||
|
||||
private static SecurityPolicy policy(Function<Integer, Status> func) {
|
||||
return new SecurityPolicy() {
|
||||
@Override
|
||||
|
|
@ -117,4 +250,24 @@ public final class ServerSecurityPolicyTest {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static AsyncSecurityPolicy asyncPolicy(Function<Integer, ListenableFuture<Status>> func) {
|
||||
return new AsyncSecurityPolicy() {
|
||||
@Override
|
||||
public ListenableFuture<Status> checkAuthorizationAsync(int uid) {
|
||||
return func.apply(uid);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static void awaitOrFail(CyclicBarrier barrier) {
|
||||
try {
|
||||
barrier.await();
|
||||
} catch (BrokenBarrierException e) {
|
||||
fail(e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue