mirror of https://github.com/grpc/grpc-java.git
Do not cache failed futures for async security policies indefinitely. (#10743)
Currently, if caching is enabled (as is often the case) and AsyncSecurityPolicy returns a failed future, then this future is cached forever, without giving the SecurityPolicy implementation a chance to be retried. Going forward, new invocations will trigger new security checks if the last one could not be completed successfuly.
This commit is contained in:
parent
062f7a2072
commit
5b082ca640
|
|
@ -26,6 +26,7 @@ import com.google.common.base.Function;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import com.google.errorprone.annotations.CanIgnoreReturnValue;
|
||||||
import com.google.protobuf.Empty;
|
import com.google.protobuf.Empty;
|
||||||
import io.grpc.CallOptions;
|
import io.grpc.CallOptions;
|
||||||
import io.grpc.ManagedChannel;
|
import io.grpc.ManagedChannel;
|
||||||
|
|
@ -45,6 +46,8 @@ import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
@ -76,6 +79,7 @@ public final class BinderSecurityTest {
|
||||||
MethodDescriptor.newBuilder(marshaller, marshaller)
|
MethodDescriptor.newBuilder(marshaller, marshaller)
|
||||||
.setFullMethodName(name)
|
.setFullMethodName(name)
|
||||||
.setType(MethodDescriptor.MethodType.UNARY)
|
.setType(MethodDescriptor.MethodType.UNARY)
|
||||||
|
.setSampledToLocalTracing(true)
|
||||||
.build();
|
.build();
|
||||||
ServerCallHandler<Empty, Empty> callHandler =
|
ServerCallHandler<Empty, Empty> callHandler =
|
||||||
ServerCalls.asyncUnaryCall(
|
ServerCalls.asyncUnaryCall(
|
||||||
|
|
@ -139,12 +143,16 @@ public final class BinderSecurityTest {
|
||||||
.isNotNull();
|
.isNotNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertCallFailure(MethodDescriptor<Empty, Empty> method, Status status) {
|
@CanIgnoreReturnValue
|
||||||
|
private StatusRuntimeException assertCallFailure(
|
||||||
|
MethodDescriptor<Empty, Empty> method, Status status) {
|
||||||
try {
|
try {
|
||||||
ClientCalls.blockingUnaryCall(channel, method, CallOptions.DEFAULT, null);
|
ClientCalls.blockingUnaryCall(channel, method, CallOptions.DEFAULT, null);
|
||||||
fail();
|
fail("Expected call to " + method.getFullMethodName() + " to fail but it succeeded.");
|
||||||
|
throw new AssertionError(); // impossible
|
||||||
} catch (StatusRuntimeException sre) {
|
} catch (StatusRuntimeException sre) {
|
||||||
assertThat(sre.getStatus().getCode()).isEqualTo(status.getCode());
|
assertThat(sre.getStatus().getCode()).isEqualTo(status.getCode());
|
||||||
|
return sre;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -172,6 +180,70 @@ public final class BinderSecurityTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailedFuturesPropagateOriginalException() throws Exception {
|
||||||
|
String errorMessage = "something went wrong";
|
||||||
|
IllegalStateException originalException = new IllegalStateException(errorMessage);
|
||||||
|
createChannel(
|
||||||
|
ServerSecurityPolicy.newBuilder()
|
||||||
|
.servicePolicy("foo", new AsyncSecurityPolicy() {
|
||||||
|
@Override
|
||||||
|
ListenableFuture<Status> checkAuthorizationAsync(int uid) {
|
||||||
|
return Futures.immediateFailedFuture(originalException);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.build(),
|
||||||
|
SecurityPolicies.internalOnly());
|
||||||
|
MethodDescriptor<Empty, Empty> method = methods.get("foo/method0");
|
||||||
|
|
||||||
|
StatusRuntimeException sre = assertCallFailure(method, Status.INTERNAL);
|
||||||
|
assertThat(sre.getStatus().getDescription()).contains(errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFailedFuturesAreNotCachedPermanently() throws Exception {
|
||||||
|
AtomicReference<Boolean> firstAttempt = new AtomicReference<>(true);
|
||||||
|
createChannel(
|
||||||
|
ServerSecurityPolicy.newBuilder()
|
||||||
|
.servicePolicy("foo", new AsyncSecurityPolicy() {
|
||||||
|
@Override
|
||||||
|
ListenableFuture<Status> checkAuthorizationAsync(int uid) {
|
||||||
|
if (firstAttempt.getAndSet(false)) {
|
||||||
|
return Futures.immediateFailedFuture(new IllegalStateException());
|
||||||
|
}
|
||||||
|
return Futures.immediateFuture(Status.OK);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.build(),
|
||||||
|
SecurityPolicies.internalOnly());
|
||||||
|
MethodDescriptor<Empty, Empty> method = methods.get("foo/method0");
|
||||||
|
|
||||||
|
assertCallFailure(method, Status.INTERNAL);
|
||||||
|
assertCallSuccess(method);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelledFuturesAreNotCachedPermanently() throws Exception {
|
||||||
|
AtomicReference<Boolean> firstAttempt = new AtomicReference<>(true);
|
||||||
|
createChannel(
|
||||||
|
ServerSecurityPolicy.newBuilder()
|
||||||
|
.servicePolicy("foo", new AsyncSecurityPolicy() {
|
||||||
|
@Override
|
||||||
|
ListenableFuture<Status> checkAuthorizationAsync(int uid) {
|
||||||
|
if (firstAttempt.getAndSet(false)) {
|
||||||
|
return Futures.immediateCancelledFuture();
|
||||||
|
}
|
||||||
|
return Futures.immediateFuture(Status.OK);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.build(),
|
||||||
|
SecurityPolicies.internalOnly());
|
||||||
|
MethodDescriptor<Empty, Empty> method = methods.get("foo/method0");
|
||||||
|
|
||||||
|
assertCallFailure(method, Status.INTERNAL);
|
||||||
|
assertCallSuccess(method);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testClientDoesntTrustServer() throws Exception {
|
public void testClientDoesntTrustServer() throws Exception {
|
||||||
createChannel(SecurityPolicies.serverInternalOnly(), policy((uid) -> false));
|
createChannel(SecurityPolicies.serverInternalOnly(), policy((uid) -> false));
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ package io.grpc.binder.internal;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
|
||||||
import io.grpc.Attributes;
|
import io.grpc.Attributes;
|
||||||
import io.grpc.Internal;
|
import io.grpc.Internal;
|
||||||
|
|
@ -32,6 +33,7 @@ import io.grpc.ServerInterceptor;
|
||||||
import io.grpc.Status;
|
import io.grpc.Status;
|
||||||
import io.grpc.internal.GrpcAttributes;
|
import io.grpc.internal.GrpcAttributes;
|
||||||
|
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
|
|
@ -110,9 +112,13 @@ public final class BinderTransportSecurity {
|
||||||
Status authStatus;
|
Status authStatus;
|
||||||
try {
|
try {
|
||||||
authStatus = Futures.getDone(authStatusFuture);
|
authStatus = Futures.getDone(authStatusFuture);
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException | CancellationException e) {
|
||||||
// Failed futures are treated as an internal error rather than a security rejection.
|
// Failed futures are treated as an internal error rather than a security rejection.
|
||||||
authStatus = Status.INTERNAL.withCause(e);
|
authStatus = Status.INTERNAL.withCause(e);
|
||||||
|
@Nullable String message = e.getMessage();
|
||||||
|
if (message != null) {
|
||||||
|
authStatus = authStatus.withDescription(message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (authStatus.isOk()) {
|
if (authStatus.isOk()) {
|
||||||
|
|
@ -179,6 +185,8 @@ public final class BinderTransportSecurity {
|
||||||
if (useCache) {
|
if (useCache) {
|
||||||
@Nullable ListenableFuture<Status> authorization = serviceAuthorization.get(serviceName);
|
@Nullable ListenableFuture<Status> authorization = serviceAuthorization.get(serviceName);
|
||||||
if (authorization != null) {
|
if (authorization != null) {
|
||||||
|
// Authorization check exists and is a pending or successful future (even if for a
|
||||||
|
// failed authorization).
|
||||||
return authorization;
|
return authorization;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -193,6 +201,15 @@ public final class BinderTransportSecurity {
|
||||||
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
|
serverPolicyChecker.checkAuthorizationForServiceAsync(uid, serviceName);
|
||||||
if (useCache) {
|
if (useCache) {
|
||||||
serviceAuthorization.putIfAbsent(serviceName, authorization);
|
serviceAuthorization.putIfAbsent(serviceName, authorization);
|
||||||
|
Futures.addCallback(authorization, new FutureCallback<Status>() {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Status result) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable t) {
|
||||||
|
serviceAuthorization.remove(serviceName, authorization);
|
||||||
|
}
|
||||||
|
}, MoreExecutors.directExecutor());
|
||||||
}
|
}
|
||||||
return authorization;
|
return authorization;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue