mirror of https://github.com/grpc/grpc-java.git
Permit overriding Channel's executor per-call
This commit is contained in:
parent
70ef5b1172
commit
f59e04f310
|
|
@ -33,6 +33,7 @@ package io.grpc;
|
||||||
|
|
||||||
import com.google.common.base.MoreObjects;
|
import com.google.common.base.MoreObjects;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
@ -54,6 +55,7 @@ public final class CallOptions {
|
||||||
// them outside of constructor. Otherwise the constructor will have a potentially long list of
|
// them outside of constructor. Otherwise the constructor will have a potentially long list of
|
||||||
// unnamed arguments, which is undesirable.
|
// unnamed arguments, which is undesirable.
|
||||||
private Long deadlineNanoTime;
|
private Long deadlineNanoTime;
|
||||||
|
private Executor executor;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private String authority;
|
private String authority;
|
||||||
|
|
@ -144,6 +146,21 @@ public final class CallOptions {
|
||||||
return authority;
|
return authority;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new {@code CallOptions} with {@code executor} to be used instead of the default
|
||||||
|
* executor specified with {@link ManagedChannelBuilder#executor}.
|
||||||
|
*/
|
||||||
|
public CallOptions withExecutor(Executor executor) {
|
||||||
|
CallOptions newOptions = new CallOptions(this);
|
||||||
|
newOptions.executor = executor;
|
||||||
|
return newOptions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public Executor getExecutor() {
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
|
||||||
private CallOptions() {
|
private CallOptions() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -323,6 +323,10 @@ public final class ManagedChannelImpl extends ManagedChannel {
|
||||||
@Override
|
@Override
|
||||||
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
|
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
|
||||||
CallOptions callOptions) {
|
CallOptions callOptions) {
|
||||||
|
Executor executor = callOptions.getExecutor();
|
||||||
|
if (executor == null) {
|
||||||
|
executor = ManagedChannelImpl.this.executor;
|
||||||
|
}
|
||||||
return new ClientCallImpl<ReqT, RespT>(
|
return new ClientCallImpl<ReqT, RespT>(
|
||||||
method,
|
method,
|
||||||
executor,
|
executor,
|
||||||
|
|
|
||||||
|
|
@ -37,11 +37,13 @@ import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import com.google.common.base.Objects;
|
import com.google.common.base.Objects;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.JUnit4;
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/** Unit tests for {@link CallOptions}. */
|
/** Unit tests for {@link CallOptions}. */
|
||||||
|
|
@ -60,6 +62,7 @@ public class CallOptionsTest {
|
||||||
assertNull(CallOptions.DEFAULT.getDeadlineNanoTime());
|
assertNull(CallOptions.DEFAULT.getDeadlineNanoTime());
|
||||||
assertNull(CallOptions.DEFAULT.getAuthority());
|
assertNull(CallOptions.DEFAULT.getAuthority());
|
||||||
assertNull(CallOptions.DEFAULT.getRequestKey());
|
assertNull(CallOptions.DEFAULT.getRequestKey());
|
||||||
|
assertNull(CallOptions.DEFAULT.getExecutor());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -89,6 +92,17 @@ public class CallOptionsTest {
|
||||||
assertNull(options2.getDeadlineNanoTime());
|
assertNull(options2.getDeadlineNanoTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mutateExecutor() {
|
||||||
|
Executor executor = MoreExecutors.directExecutor();
|
||||||
|
CallOptions options1 = CallOptions.DEFAULT.withExecutor(executor);
|
||||||
|
assertNull(CallOptions.DEFAULT.getExecutor());
|
||||||
|
assertSame(executor, options1.getExecutor());
|
||||||
|
CallOptions options2 = options1.withExecutor(null);
|
||||||
|
assertSame(executor, options1.getExecutor());
|
||||||
|
assertNull(options2.getExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWithDeadlineAfter() {
|
public void testWithDeadlineAfter() {
|
||||||
long deadline = CallOptions.DEFAULT
|
long deadline = CallOptions.DEFAULT
|
||||||
|
|
|
||||||
|
|
@ -88,6 +88,7 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
@ -314,6 +315,37 @@ public class ManagedChannelImplTest {
|
||||||
transportListener.transportTerminated();
|
transportListener.transportTerminated();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void callOptionsExecutor() {
|
||||||
|
Metadata headers = new Metadata();
|
||||||
|
ClientStream mockStream = mock(ClientStream.class);
|
||||||
|
when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream);
|
||||||
|
|
||||||
|
final List<Runnable> runnables = new ArrayList<Runnable>();
|
||||||
|
Executor executor = new Executor() {
|
||||||
|
@Override
|
||||||
|
public void execute(Runnable r) {
|
||||||
|
runnables.add(r);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
ManagedChannel channel = createChannel(
|
||||||
|
new FakeNameResolverFactory(true), NO_INTERCEPTOR);
|
||||||
|
ClientCall<String, Integer> call =
|
||||||
|
channel.newCall(method, CallOptions.DEFAULT.withExecutor(executor));
|
||||||
|
call.start(mockCallListener, headers);
|
||||||
|
verify(mockTransport, timeout(1000)).newStream(same(method), same(headers));
|
||||||
|
verify(mockStream).start(streamListenerCaptor.capture());
|
||||||
|
ClientStreamListener streamListener = streamListenerCaptor.getValue();
|
||||||
|
Metadata trailers = new Metadata();
|
||||||
|
streamListener.closed(Status.CANCELLED, trailers);
|
||||||
|
assertFalse(runnables.isEmpty());
|
||||||
|
verify(mockCallListener, never()).onClose(same(Status.CANCELLED), same(trailers));
|
||||||
|
for (Runnable r : runnables) {
|
||||||
|
r.run();
|
||||||
|
}
|
||||||
|
verify(mockCallListener).onClose(same(Status.CANCELLED), same(trailers));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void nameResolutionFailed() {
|
public void nameResolutionFailed() {
|
||||||
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
|
Status error = Status.UNAVAILABLE.withCause(new Throwable("fake name resolution error"));
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue