mirror of https://github.com/grpc/grpc-java.git
Move newstub to third_party and rename to stub.
It was an oversight that newstub was not in third_party when committed. Renaming of package was preformed via a script. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=68416211
This commit is contained in:
parent
2ce8446a0d
commit
8be938ee25
|
|
@ -0,0 +1,21 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for all stub configurations.
|
||||||
|
*/
|
||||||
|
public abstract class AbstractServiceDescriptor<T extends AbstractServiceDescriptor> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the list of operations defined in the stub configuration.
|
||||||
|
*/
|
||||||
|
public abstract ImmutableList<MethodDescriptor> methods();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new stub configuration for the provided method configurations.
|
||||||
|
*/
|
||||||
|
protected abstract T build(Map<String, MethodDescriptor> methodMap);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,80 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Common base type for stub implementations. Allows for reconfiguration.
|
||||||
|
*/
|
||||||
|
// TODO(user): Move into 3rd party when tidy
|
||||||
|
// TOOD(lryan/kevinb): Excessive parameterization can be a pain, try to eliminate once the generated
|
||||||
|
// code is more tangible.
|
||||||
|
public abstract class AbstractStub<S extends AbstractStub, C extends AbstractServiceDescriptor<C>> {
|
||||||
|
protected final Channel channel;
|
||||||
|
protected final C config;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor for use by subclasses.
|
||||||
|
*/
|
||||||
|
protected AbstractStub(Channel channel, C config) {
|
||||||
|
this.channel = channel;
|
||||||
|
this.config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected C getServiceDescriptor() {
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StubConfigBuilder configureNewStub() {
|
||||||
|
return new StubConfigBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new stub configuration for the provided method configurations.
|
||||||
|
*/
|
||||||
|
protected abstract S build(Channel channel, C config);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class for (re) configuring the operations in a stub.
|
||||||
|
*/
|
||||||
|
public class StubConfigBuilder {
|
||||||
|
|
||||||
|
private final Map<String, MethodDescriptor> methodMap;
|
||||||
|
|
||||||
|
private StubConfigBuilder() {
|
||||||
|
methodMap = Maps.newHashMapWithExpectedSize(config.methods().size());
|
||||||
|
for (MethodDescriptor method : AbstractStub.this.config.methods()) {
|
||||||
|
methodMap.put(method.getName(), method);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set a timeout for all methods in the stub.
|
||||||
|
*/
|
||||||
|
public StubConfigBuilder setTimeout(long timeout, TimeUnit unit) {
|
||||||
|
for (MethodDescriptor methodDescriptor : methodMap.values()) {
|
||||||
|
setTimeout(methodDescriptor, timeout, unit);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the timeout for the specified method in microseconds.
|
||||||
|
*/
|
||||||
|
private StubConfigBuilder setTimeout(MethodDescriptor method, long timeout, TimeUnit unit) {
|
||||||
|
// This is the pattern we would use for per-method configuration but currently
|
||||||
|
// no strong use-cases for it, hence private here
|
||||||
|
methodMap.put(method.getName(), method.withTimeout(timeout, unit));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new stub configuration
|
||||||
|
*/
|
||||||
|
public S build() {
|
||||||
|
return AbstractStub.this.build(channel, config.build(methodMap));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulates the producer and consumer aspects of a call. A Call is a deferred execution context
|
||||||
|
* that does not dispatch data to the remote implementation until 'start' is called.
|
||||||
|
* <p>
|
||||||
|
* Call and it's sub-classes are used by the stub code generators to produced typed stubs.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
// TODO(user): Implement context support
|
||||||
|
public interface Call<RequestT, ResponseT> extends StreamObserver<RequestT> {
|
||||||
|
/**
|
||||||
|
* Start a call passing it the response {@link StreamObserver}.
|
||||||
|
* @param responseObserver Which receive response messages.
|
||||||
|
*/
|
||||||
|
public void start(StreamObserver<ResponseT> responseObserver);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Common handling for context data
|
||||||
|
*/
|
||||||
|
// TODO(user): Implement
|
||||||
|
public class CallContext {
|
||||||
|
|
||||||
|
void addResponseContext(String name, InputStream value) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,243 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
import com.google.net.stubby.Status;
|
||||||
|
import com.google.net.stubby.transport.Transport;
|
||||||
|
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility functions for processing different call idioms. We have one-to-one correspondence
|
||||||
|
* between utilities in this class and the potential signatures in a generated stub class so
|
||||||
|
* that the runtime can vary behavior without requiring regeneration of the stub.
|
||||||
|
*/
|
||||||
|
public class Calls {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a unary call and return a {@link ListenableFuture} to the response.
|
||||||
|
* @return a future for the single response message.
|
||||||
|
*/
|
||||||
|
public static <ReqT, RespT> ListenableFuture<RespT> unaryFutureCall(
|
||||||
|
Call<ReqT, RespT> call,
|
||||||
|
ReqT param) {
|
||||||
|
SettableFuture<RespT> responseFuture = SettableFuture.create();
|
||||||
|
asyncServerStreamingCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture));
|
||||||
|
return responseFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a unary call and block on the response.
|
||||||
|
* @return the single response message.
|
||||||
|
*/
|
||||||
|
public static <ReqT, RespT> RespT blockingUnaryCall(Call<ReqT, RespT> call, ReqT param) {
|
||||||
|
return Futures.getUnchecked(unaryFutureCall(call, param));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a unary call with a response {@link StreamObserver}.
|
||||||
|
*/
|
||||||
|
public static <ReqT, RespT> void asyncUnaryCall(
|
||||||
|
Call<ReqT, RespT> call,
|
||||||
|
ReqT param,
|
||||||
|
StreamObserver<RespT> observer) {
|
||||||
|
asyncServerStreamingCall(call, param, observer);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a server-streaming call returning a blocking {@link Iterator} over the
|
||||||
|
* response stream.
|
||||||
|
* @return an iterator over the response stream.
|
||||||
|
*/
|
||||||
|
// TODO(user): Not clear if we want to use this idiom for 'simple' stubs.
|
||||||
|
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
|
||||||
|
Call<ReqT, RespT> call, ReqT param) {
|
||||||
|
// This is an interesting scenario for flow control...
|
||||||
|
// TODO(user): Capacity restriction is entirely arbitrary, need to parameterize.
|
||||||
|
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(4096);
|
||||||
|
asyncServerStreamingCall(call, param, result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a server-streaming call with a response {@link StreamObserver}.
|
||||||
|
*/
|
||||||
|
public static <ReqT, RespT> void asyncServerStreamingCall(
|
||||||
|
Call<ReqT, RespT> call,
|
||||||
|
ReqT param,
|
||||||
|
StreamObserver<RespT> responseObserver) {
|
||||||
|
call.start(responseObserver);
|
||||||
|
try {
|
||||||
|
call.onValue(param);
|
||||||
|
call.onCompleted();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
call.onError(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a client-streaming call with a blocking {@link Iterator} of request messages.
|
||||||
|
* @return the single response value.
|
||||||
|
*/
|
||||||
|
public static <ReqT, RespT> RespT blockingClientStreamingCall(
|
||||||
|
Call<ReqT, RespT> call,
|
||||||
|
Iterator<ReqT> clientStream) {
|
||||||
|
SettableFuture<RespT> responseFuture = SettableFuture.create();
|
||||||
|
call.start(new UnaryStreamToFuture<RespT>(responseFuture));
|
||||||
|
try {
|
||||||
|
while (clientStream.hasNext()) {
|
||||||
|
call.onValue(clientStream.next());
|
||||||
|
}
|
||||||
|
call.onCompleted();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
// Notify runtime of the error which will cancel the call
|
||||||
|
call.onError(t);
|
||||||
|
throw Throwables.propagate(t);
|
||||||
|
}
|
||||||
|
return Futures.getUnchecked(responseFuture);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a client-streaming call returning a {@link StreamObserver} for the request messages.
|
||||||
|
* @return request stream observer.
|
||||||
|
*/
|
||||||
|
public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
|
||||||
|
Call<ReqT, RespT> call,
|
||||||
|
StreamObserver<RespT> responseObserver) {
|
||||||
|
return duplexStreamingCall(call, responseObserver);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a duplex-streaming call.
|
||||||
|
* @return request stream observer.
|
||||||
|
*/
|
||||||
|
public static <ReqT, RespT> StreamObserver<ReqT> duplexStreamingCall(
|
||||||
|
Call<ReqT, RespT> call, StreamObserver<RespT> responseObserver) {
|
||||||
|
call.start(responseObserver);
|
||||||
|
return call;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Complete a SettableFuture using {@link StreamObserver} events.
|
||||||
|
*/
|
||||||
|
private static class UnaryStreamToFuture<RespT> implements StreamObserver<RespT> {
|
||||||
|
private final SettableFuture<RespT> responseFuture;
|
||||||
|
private RespT value;
|
||||||
|
|
||||||
|
public UnaryStreamToFuture(SettableFuture<RespT> responseFuture) {
|
||||||
|
this.responseFuture = responseFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onValue(RespT value) {
|
||||||
|
if (this.value != null) {
|
||||||
|
throw new Status(Transport.Code.INTERNAL, "More than one value received for unary call")
|
||||||
|
.asRuntimeException();
|
||||||
|
}
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
responseFuture.setException(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
if (value == null) {
|
||||||
|
// No value received so mark the future as an error
|
||||||
|
responseFuture.setException(
|
||||||
|
new Status(Transport.Code.INTERNAL, "No value received for unary call")
|
||||||
|
.asRuntimeException().fillInStackTrace());
|
||||||
|
}
|
||||||
|
responseFuture.set(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert events on a {@link StreamObserver} into a blocking {@link Iterator}
|
||||||
|
*/
|
||||||
|
@ThreadSafe
|
||||||
|
private static class BlockingResponseStream<T> implements Iterator<T>, StreamObserver<T> {
|
||||||
|
|
||||||
|
private final LinkedBlockingQueue<Object> buffer;
|
||||||
|
private final int maxBufferSize;
|
||||||
|
private Object last;
|
||||||
|
private boolean done = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construct a buffering iterator so that blocking clients can consume the response stream.
|
||||||
|
* @param maxBufferSize limit on number of messages in the buffer before the stream is
|
||||||
|
* terminated.
|
||||||
|
*/
|
||||||
|
private BlockingResponseStream(int maxBufferSize) {
|
||||||
|
buffer = new LinkedBlockingQueue<Object>();
|
||||||
|
this.maxBufferSize = maxBufferSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onValue(T value) {
|
||||||
|
Preconditions.checkState(!done, "Call to onValue afer onError/onCompleted");
|
||||||
|
if (buffer.size() >= maxBufferSize) {
|
||||||
|
// Throw the exception as observables are required to propagate it to onError
|
||||||
|
throw new CancellationException("Buffer size exceeded");
|
||||||
|
} else {
|
||||||
|
buffer.offer(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onError(Throwable t) {
|
||||||
|
Preconditions.checkState(!done, "Call to onError after call to onError/onCompleted");
|
||||||
|
buffer.offer(t);
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onCompleted() {
|
||||||
|
Preconditions.checkState(!done, "Call to onCompleted after call to onError/onCompleted");
|
||||||
|
buffer.offer(this);
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized boolean hasNext() {
|
||||||
|
// Will block here indefinitely waiting for content, RPC timeouts defend against
|
||||||
|
// permanent hangs here as onError will be reported.
|
||||||
|
try {
|
||||||
|
last = (last == null) ? buffer.take() : last;
|
||||||
|
if (last instanceof Throwable) {
|
||||||
|
throw Throwables.propagate((Throwable) last);
|
||||||
|
}
|
||||||
|
return last != this;
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
Thread.interrupted();
|
||||||
|
throw Status.fromThrowable(ie).asRuntimeException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized T next() {
|
||||||
|
if (!hasNext()) {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
T tmp = (T) last;
|
||||||
|
last = null;
|
||||||
|
return tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,17 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import javax.annotation.concurrent.ThreadSafe;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An abstraction layer between stubs and the transport details. Channels are responsible for
|
||||||
|
* call initiation and tracking. Channels can be decorated to provide cross-cutting behaviors
|
||||||
|
* across all operations in a stub.
|
||||||
|
*/
|
||||||
|
@ThreadSafe
|
||||||
|
public interface Channel {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepare a call to the given service method.
|
||||||
|
*/
|
||||||
|
public <ReqT, RespT> Call<ReqT, RespT> prepare(MethodDescriptor<ReqT, RespT> method);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An typed abstraction over message serialization.
|
||||||
|
*/
|
||||||
|
public interface Marshaller<T> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a message produce an {@link InputStream} for it.
|
||||||
|
*/
|
||||||
|
// TODO(user): Switch to ByteSource equivalent when ready
|
||||||
|
public InputStream stream(T value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given an {@link InputStream} parse it into an instance of the declared type.
|
||||||
|
*/
|
||||||
|
public T parse(InputStream stream);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,37 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.net.stubby.DeferredProtoInputStream;
|
||||||
|
import com.google.net.stubby.Status;
|
||||||
|
import com.google.net.stubby.transport.Transport;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import com.google.protobuf.MessageLite;
|
||||||
|
import com.google.protobuf.Parser;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility functions for working with Marshallers.
|
||||||
|
*/
|
||||||
|
public class Marshallers {
|
||||||
|
|
||||||
|
private Marshallers() {}
|
||||||
|
|
||||||
|
public static <T extends MessageLite> Marshaller<T> forProto(final Parser<T> parser) {
|
||||||
|
return new Marshaller<T>() {
|
||||||
|
@Override
|
||||||
|
public InputStream stream(T value) {
|
||||||
|
return new DeferredProtoInputStream(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public T parse(InputStream stream) {
|
||||||
|
try {
|
||||||
|
return parser.parseFrom(stream);
|
||||||
|
} catch (InvalidProtocolBufferException ipbe) {
|
||||||
|
throw new Status(Transport.Code.INTERNAL, "Invalid protobuf byte sequence", ipbe)
|
||||||
|
.asRuntimeException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,8 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
public interface MessageSink<E> {
|
||||||
|
|
||||||
|
public void receive(E message, boolean last);
|
||||||
|
|
||||||
|
public void close();
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,6 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
public interface MessageSource<E> {
|
||||||
|
|
||||||
|
public void produceToSink(MessageSink<E> sink);
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,74 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.annotation.concurrent.Immutable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Descriptor for a single operation, used by Channel to execute a call.
|
||||||
|
*/
|
||||||
|
@Immutable
|
||||||
|
public class MethodDescriptor<RequestT, ResponseT> {
|
||||||
|
private final String name;
|
||||||
|
// TODO(user): Make more generic than just proto? Use Function?
|
||||||
|
private final Marshaller<RequestT> requestMarshaller;
|
||||||
|
private final Marshaller<ResponseT> responseMarshaller;
|
||||||
|
private final long timeoutMicros;
|
||||||
|
|
||||||
|
public static <RequestT, ResponseT> MethodDescriptor<RequestT, ResponseT> create(
|
||||||
|
String name, long timeout, TimeUnit timeoutUnit,
|
||||||
|
Marshaller<RequestT> requestMarshaller,
|
||||||
|
Marshaller<ResponseT> responseMarshaller) {
|
||||||
|
return new MethodDescriptor<RequestT, ResponseT>(
|
||||||
|
name, timeoutUnit.toMicros(timeout), requestMarshaller, responseMarshaller);
|
||||||
|
}
|
||||||
|
|
||||||
|
private MethodDescriptor(String name, long timeoutMicros,
|
||||||
|
Marshaller<RequestT> requestMarshaller,
|
||||||
|
Marshaller<ResponseT> responseMarshaller) {
|
||||||
|
this.name = name;
|
||||||
|
Preconditions.checkArgument(timeoutMicros > 0);
|
||||||
|
this.timeoutMicros = timeoutMicros;
|
||||||
|
this.requestMarshaller = requestMarshaller;
|
||||||
|
this.responseMarshaller = responseMarshaller;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The fully qualified name of the method
|
||||||
|
*/
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Timeout for the operation in microseconds
|
||||||
|
*/
|
||||||
|
public long getTimeout() {
|
||||||
|
return timeoutMicros;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse a response payload from the given {@link InputStream}
|
||||||
|
*/
|
||||||
|
public ResponseT parseResponse(InputStream input) {
|
||||||
|
return responseMarshaller.parse(input);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a request message to an {@link InputStream}
|
||||||
|
*/
|
||||||
|
public InputStream streamRequest(RequestT requestMessage) {
|
||||||
|
return requestMarshaller.stream(requestMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new descriptor with a different timeout
|
||||||
|
*/
|
||||||
|
public MethodDescriptor withTimeout(long timeout, TimeUnit unit) {
|
||||||
|
return new MethodDescriptor(name , unit.toMicros(timeout),
|
||||||
|
requestMarshaller, responseMarshaller);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,123 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.net.stubby.AbstractResponse;
|
||||||
|
import com.google.net.stubby.Operation;
|
||||||
|
import com.google.net.stubby.Request;
|
||||||
|
import com.google.net.stubby.Response;
|
||||||
|
import com.google.net.stubby.Session;
|
||||||
|
import com.google.net.stubby.Status;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A temporary shim layer between the new (Channel) and the old (Session). Will go away when the
|
||||||
|
* new transport layer is created.
|
||||||
|
*/
|
||||||
|
// TODO(user): Delete this class when new transport interfaces are introduced
|
||||||
|
public class SessionCall<RequestT, ResponseT>
|
||||||
|
extends CallContext implements Call<RequestT, ResponseT> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The {@link Request} used by the stub to dispatch the call
|
||||||
|
*/
|
||||||
|
private Request request;
|
||||||
|
|
||||||
|
private StreamObserver<ResponseT> responseObserver;
|
||||||
|
|
||||||
|
private final MethodDescriptor<RequestT, ResponseT> methodDescriptor;
|
||||||
|
private final Session session;
|
||||||
|
|
||||||
|
protected SessionCall(MethodDescriptor<RequestT, ResponseT> methodDescriptor, Session session) {
|
||||||
|
// This will go away when we introduce new transport API.... nothing to see here
|
||||||
|
this.methodDescriptor = methodDescriptor;
|
||||||
|
this.session = session;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(StreamObserver<ResponseT> responseObserver) {
|
||||||
|
request = session.startRequest(methodDescriptor.getName(), new Response.ResponseBuilder() {
|
||||||
|
@Override
|
||||||
|
public Response build(int id) {
|
||||||
|
return new CallResponse(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Response build() {
|
||||||
|
return new CallResponse(-1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
this.responseObserver = responseObserver;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onValue(RequestT value) {
|
||||||
|
request.addPayload(methodDescriptor.streamRequest(value), Operation.Phase.PAYLOAD);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An error occurred while producing the request output. Cancel the request
|
||||||
|
* and close the response stream.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
request.close(Status.fromThrowable(t));
|
||||||
|
this.responseObserver.onError(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
request.close(Status.OK);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adapts the transport layer response to calls on the response observer or
|
||||||
|
* recorded context state.
|
||||||
|
*/
|
||||||
|
private class CallResponse extends AbstractResponse {
|
||||||
|
|
||||||
|
private CallResponse(int id) {
|
||||||
|
super(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Operation addContext(String type, InputStream message, Phase nextPhase) {
|
||||||
|
try {
|
||||||
|
SessionCall.this.addResponseContext(type, message);
|
||||||
|
return super.addContext(type, message, nextPhase);
|
||||||
|
} finally {
|
||||||
|
if (getPhase() == Phase.CLOSED) {
|
||||||
|
propagateClosed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Operation addPayload(InputStream payload, Phase nextPhase) {
|
||||||
|
try {
|
||||||
|
SessionCall.this.responseObserver.onValue(methodDescriptor.parseResponse(payload));
|
||||||
|
return super.addPayload(payload, nextPhase);
|
||||||
|
} finally {
|
||||||
|
if (getPhase() == Phase.CLOSED) {
|
||||||
|
propagateClosed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Operation close(Status status) {
|
||||||
|
try {
|
||||||
|
return super.close(status);
|
||||||
|
} finally {
|
||||||
|
propagateClosed();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void propagateClosed() {
|
||||||
|
if (Status.OK.getCode() == getStatus().getCode()) {
|
||||||
|
SessionCall.this.responseObserver.onCompleted();
|
||||||
|
} else {
|
||||||
|
SessionCall.this.responseObserver.onError(getStatus().asRuntimeException());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.net.stubby.Session;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is a shim between Session & Channel. Will be removed when the new transport
|
||||||
|
* API is introduced.
|
||||||
|
*/
|
||||||
|
public class SessionChannel implements Channel {
|
||||||
|
private final Session session;
|
||||||
|
|
||||||
|
public SessionChannel(Session session) {
|
||||||
|
this.session = session;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <ReqT, RespT> SessionCall<ReqT, RespT> prepare(MethodDescriptor<ReqT, RespT> method) {
|
||||||
|
return new SessionCall<ReqT, RespT>(method, session);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,41 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receives notifications from an observable stream of messages.
|
||||||
|
*/
|
||||||
|
// TODO(user): Consider whether we need to interact with flow-control at this layer. E.g.
|
||||||
|
// public ListenableFuture<Void> onValue(V value). Do we layer it in here or as an additional
|
||||||
|
// interface? Interaction with flow control can be done by blocking here.
|
||||||
|
public interface StreamObserver<V> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receive a value from the stream.
|
||||||
|
* <p>
|
||||||
|
* Can be called many times but is never called after onError or onCompleted are called.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* If an exception is thrown by an implementation the caller is expected to terminate the
|
||||||
|
* stream by calling {@linkplain #onError(Throwable)} with the caught exception prior to
|
||||||
|
* propagating it.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public void onValue(V value);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Receive a terminating error from the stream.
|
||||||
|
* <p>
|
||||||
|
* May only be called once and is never called after onCompleted. In particular if an exception
|
||||||
|
* is thrown by an implementation of onError no further calls to any method are allowed.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public void onError(Throwable t);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notifies successful stream completion.
|
||||||
|
* <p>
|
||||||
|
* May only be called once and is never called after onError. In particular if an exception is
|
||||||
|
* thrown by an implementation of onCompleted no further calls to any method are allowed.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public void onCompleted();
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,98 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility implementation of {link StreamObserver} used in testing. Records all the observed
|
||||||
|
* values produced by the stream as well as any errors.
|
||||||
|
*/
|
||||||
|
public class StreamRecorder<T> implements StreamObserver<T> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new recorder.
|
||||||
|
*/
|
||||||
|
public static <T> StreamRecorder<T> create() {
|
||||||
|
return new StreamRecorder<T>();
|
||||||
|
}
|
||||||
|
|
||||||
|
private final CountDownLatch latch;
|
||||||
|
private final List<T> results;
|
||||||
|
private Throwable error;
|
||||||
|
private final SettableFuture<T> firstValue;
|
||||||
|
|
||||||
|
private StreamRecorder() {
|
||||||
|
firstValue = SettableFuture.create();
|
||||||
|
latch = new CountDownLatch(1);
|
||||||
|
results = Collections.synchronizedList(Lists.<T>newArrayList());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onValue(T value) {
|
||||||
|
if (!firstValue.isDone()) {
|
||||||
|
firstValue.set(value);
|
||||||
|
}
|
||||||
|
results.add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
if (!firstValue.isDone()) {
|
||||||
|
firstValue.setException(t);
|
||||||
|
}
|
||||||
|
error = t;
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onCompleted() {
|
||||||
|
if (!firstValue.isDone()) {
|
||||||
|
firstValue.setException(new IllegalStateException("No first value provided"));
|
||||||
|
}
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for the stream to terminate.
|
||||||
|
*/
|
||||||
|
public void awaitCompletion() throws Exception {
|
||||||
|
latch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait a fixed timeout for the stream to terminate.
|
||||||
|
*/
|
||||||
|
public boolean awaitCompletion(int timeout, TimeUnit unit) throws Exception {
|
||||||
|
return latch.await(timeout, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the current set of received values.
|
||||||
|
*/
|
||||||
|
public List<T> getValues() {
|
||||||
|
return Collections.unmodifiableList(results);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the stream terminating error.
|
||||||
|
*/
|
||||||
|
@Nullable public Throwable getError() {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return a {@link ListenableFuture} for the first value received from the stream. Useful
|
||||||
|
* for testing unary call patterns.
|
||||||
|
*/
|
||||||
|
public ListenableFuture<T> firstValue() {
|
||||||
|
return firstValue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,38 @@
|
||||||
|
package com.google.net.stubby.stub;
|
||||||
|
|
||||||
|
import com.google.net.stubby.DeferredProtoInputStream;
|
||||||
|
import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
import com.google.protobuf.MessageLite;
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* StubDescriptor used by generated stubs
|
||||||
|
*/
|
||||||
|
// TODO(user): Should really be an interface
|
||||||
|
public class StubDescriptor<I extends MessageLite, O extends MessageLite> {
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
private final O defaultO;
|
||||||
|
|
||||||
|
public StubDescriptor(String name, O defaultO) {
|
||||||
|
this.name = name;
|
||||||
|
this.defaultO = defaultO;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public O parseResponse(InputStream input) {
|
||||||
|
try {
|
||||||
|
return (O) defaultO.getParserForType().parseFrom(input);
|
||||||
|
} catch (InvalidProtocolBufferException ipbe) {
|
||||||
|
throw new RuntimeException(ipbe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public InputStream streamRequest(I input) {
|
||||||
|
return new DeferredProtoInputStream(input);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue