diff --git a/stub/src/main/java/com/google/net/stubby/stub/AbstractServiceDescriptor.java b/stub/src/main/java/com/google/net/stubby/stub/AbstractServiceDescriptor.java new file mode 100644 index 0000000000..c5e0d7544c --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/AbstractServiceDescriptor.java @@ -0,0 +1,21 @@ +package com.google.net.stubby.stub; + +import com.google.common.collect.ImmutableList; + +import java.util.Map; + +/** + * Base class for all stub configurations. + */ +public abstract class AbstractServiceDescriptor { + + /** + * Returns the list of operations defined in the stub configuration. + */ + public abstract ImmutableList methods(); + + /** + * Returns a new stub configuration for the provided method configurations. + */ + protected abstract T build(Map methodMap); +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java b/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java new file mode 100644 index 0000000000..81024aea5c --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/AbstractStub.java @@ -0,0 +1,80 @@ +package com.google.net.stubby.stub; + +import com.google.common.collect.Maps; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Common base type for stub implementations. Allows for reconfiguration. + */ +// TODO(user): Move into 3rd party when tidy +// TOOD(lryan/kevinb): Excessive parameterization can be a pain, try to eliminate once the generated +// code is more tangible. +public abstract class AbstractStub> { + protected final Channel channel; + protected final C config; + + /** + * Constructor for use by subclasses. + */ + protected AbstractStub(Channel channel, C config) { + this.channel = channel; + this.config = config; + } + + protected C getServiceDescriptor() { + return config; + } + + public StubConfigBuilder configureNewStub() { + return new StubConfigBuilder(); + } + + /** + * Returns a new stub configuration for the provided method configurations. + */ + protected abstract S build(Channel channel, C config); + + /** + * Utility class for (re) configuring the operations in a stub. + */ + public class StubConfigBuilder { + + private final Map methodMap; + + private StubConfigBuilder() { + methodMap = Maps.newHashMapWithExpectedSize(config.methods().size()); + for (MethodDescriptor method : AbstractStub.this.config.methods()) { + methodMap.put(method.getName(), method); + } + } + + /** + * Set a timeout for all methods in the stub. + */ + public StubConfigBuilder setTimeout(long timeout, TimeUnit unit) { + for (MethodDescriptor methodDescriptor : methodMap.values()) { + setTimeout(methodDescriptor, timeout, unit); + } + return this; + } + + /** + * Set the timeout for the specified method in microseconds. + */ + private StubConfigBuilder setTimeout(MethodDescriptor method, long timeout, TimeUnit unit) { + // This is the pattern we would use for per-method configuration but currently + // no strong use-cases for it, hence private here + methodMap.put(method.getName(), method.withTimeout(timeout, unit)); + return this; + } + + /** + * Create a new stub configuration + */ + public S build() { + return AbstractStub.this.build(channel, config.build(methodMap)); + } + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/Call.java b/stub/src/main/java/com/google/net/stubby/stub/Call.java new file mode 100644 index 0000000000..598ae5d356 --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/Call.java @@ -0,0 +1,17 @@ +package com.google.net.stubby.stub; + +/** + * Encapsulates the producer and consumer aspects of a call. A Call is a deferred execution context + * that does not dispatch data to the remote implementation until 'start' is called. + *

+ * Call and it's sub-classes are used by the stub code generators to produced typed stubs. + *

+ */ +// TODO(user): Implement context support +public interface Call extends StreamObserver { + /** + * Start a call passing it the response {@link StreamObserver}. + * @param responseObserver Which receive response messages. + */ + public void start(StreamObserver responseObserver); +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/CallContext.java b/stub/src/main/java/com/google/net/stubby/stub/CallContext.java new file mode 100644 index 0000000000..81ac4ebbee --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/CallContext.java @@ -0,0 +1,14 @@ +package com.google.net.stubby.stub; + +import java.io.InputStream; + +/** + * Common handling for context data + */ +// TODO(user): Implement +public class CallContext { + + void addResponseContext(String name, InputStream value) { + + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/Calls.java b/stub/src/main/java/com/google/net/stubby/stub/Calls.java new file mode 100644 index 0000000000..c8b90c7557 --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/Calls.java @@ -0,0 +1,243 @@ +package com.google.net.stubby.stub; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.net.stubby.Status; +import com.google.net.stubby.transport.Transport; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.CancellationException; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Utility functions for processing different call idioms. We have one-to-one correspondence + * between utilities in this class and the potential signatures in a generated stub class so + * that the runtime can vary behavior without requiring regeneration of the stub. + */ +public class Calls { + + /** + * Execute a unary call and return a {@link ListenableFuture} to the response. + * @return a future for the single response message. + */ + public static ListenableFuture unaryFutureCall( + Call call, + ReqT param) { + SettableFuture responseFuture = SettableFuture.create(); + asyncServerStreamingCall(call, param, new UnaryStreamToFuture(responseFuture)); + return responseFuture; + } + + /** + * Execute a unary call and block on the response. + * @return the single response message. + */ + public static RespT blockingUnaryCall(Call call, ReqT param) { + return Futures.getUnchecked(unaryFutureCall(call, param)); + } + + /** + * Execute a unary call with a response {@link StreamObserver}. + */ + public static void asyncUnaryCall( + Call call, + ReqT param, + StreamObserver observer) { + asyncServerStreamingCall(call, param, observer); + } + + /** + * Execute a server-streaming call returning a blocking {@link Iterator} over the + * response stream. + * @return an iterator over the response stream. + */ + // TODO(user): Not clear if we want to use this idiom for 'simple' stubs. + public static Iterator blockingServerStreamingCall( + Call call, ReqT param) { + // This is an interesting scenario for flow control... + // TODO(user): Capacity restriction is entirely arbitrary, need to parameterize. + BlockingResponseStream result = new BlockingResponseStream<>(4096); + asyncServerStreamingCall(call, param, result); + return result; + } + + /** + * Execute a server-streaming call with a response {@link StreamObserver}. + */ + public static void asyncServerStreamingCall( + Call call, + ReqT param, + StreamObserver responseObserver) { + call.start(responseObserver); + try { + call.onValue(param); + call.onCompleted(); + } catch (Throwable t) { + call.onError(t); + } + } + + /** + * Execute a client-streaming call with a blocking {@link Iterator} of request messages. + * @return the single response value. + */ + public static RespT blockingClientStreamingCall( + Call call, + Iterator clientStream) { + SettableFuture responseFuture = SettableFuture.create(); + call.start(new UnaryStreamToFuture(responseFuture)); + try { + while (clientStream.hasNext()) { + call.onValue(clientStream.next()); + } + call.onCompleted(); + } catch (Throwable t) { + // Notify runtime of the error which will cancel the call + call.onError(t); + throw Throwables.propagate(t); + } + return Futures.getUnchecked(responseFuture); + } + + /** + * Execute a client-streaming call returning a {@link StreamObserver} for the request messages. + * @return request stream observer. + */ + public static StreamObserver asyncClientStreamingCall( + Call call, + StreamObserver responseObserver) { + return duplexStreamingCall(call, responseObserver); + } + + + /** + * Execute a duplex-streaming call. + * @return request stream observer. + */ + public static StreamObserver duplexStreamingCall( + Call call, StreamObserver responseObserver) { + call.start(responseObserver); + return call; + } + + /** + * Complete a SettableFuture using {@link StreamObserver} events. + */ + private static class UnaryStreamToFuture implements StreamObserver { + private final SettableFuture responseFuture; + private RespT value; + + public UnaryStreamToFuture(SettableFuture responseFuture) { + this.responseFuture = responseFuture; + } + + @Override + public void onValue(RespT value) { + if (this.value != null) { + throw new Status(Transport.Code.INTERNAL, "More than one value received for unary call") + .asRuntimeException(); + } + this.value = value; + } + + @Override + public void onError(Throwable t) { + responseFuture.setException(t); + } + + @Override + public void onCompleted() { + if (value == null) { + // No value received so mark the future as an error + responseFuture.setException( + new Status(Transport.Code.INTERNAL, "No value received for unary call") + .asRuntimeException().fillInStackTrace()); + } + responseFuture.set(value); + } + } + + /** + * Convert events on a {@link StreamObserver} into a blocking {@link Iterator} + */ + @ThreadSafe + private static class BlockingResponseStream implements Iterator, StreamObserver { + + private final LinkedBlockingQueue buffer; + private final int maxBufferSize; + private Object last; + private boolean done = false; + + /** + * Construct a buffering iterator so that blocking clients can consume the response stream. + * @param maxBufferSize limit on number of messages in the buffer before the stream is + * terminated. + */ + private BlockingResponseStream(int maxBufferSize) { + buffer = new LinkedBlockingQueue(); + this.maxBufferSize = maxBufferSize; + } + + @Override + public synchronized void onValue(T value) { + Preconditions.checkState(!done, "Call to onValue afer onError/onCompleted"); + if (buffer.size() >= maxBufferSize) { + // Throw the exception as observables are required to propagate it to onError + throw new CancellationException("Buffer size exceeded"); + } else { + buffer.offer(value); + } + } + + @Override + public synchronized void onError(Throwable t) { + Preconditions.checkState(!done, "Call to onError after call to onError/onCompleted"); + buffer.offer(t); + done = true; + } + + @Override + public synchronized void onCompleted() { + Preconditions.checkState(!done, "Call to onCompleted after call to onError/onCompleted"); + buffer.offer(this); + done = true; + } + + @Override + public synchronized boolean hasNext() { + // Will block here indefinitely waiting for content, RPC timeouts defend against + // permanent hangs here as onError will be reported. + try { + last = (last == null) ? buffer.take() : last; + if (last instanceof Throwable) { + throw Throwables.propagate((Throwable) last); + } + return last != this; + } catch (InterruptedException ie) { + Thread.interrupted(); + throw Status.fromThrowable(ie).asRuntimeException(); + } + } + + @Override + public synchronized T next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + T tmp = (T) last; + last = null; + return tmp; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/Channel.java b/stub/src/main/java/com/google/net/stubby/stub/Channel.java new file mode 100644 index 0000000000..843406708b --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/Channel.java @@ -0,0 +1,17 @@ +package com.google.net.stubby.stub; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * An abstraction layer between stubs and the transport details. Channels are responsible for + * call initiation and tracking. Channels can be decorated to provide cross-cutting behaviors + * across all operations in a stub. + */ +@ThreadSafe +public interface Channel { + + /** + * Prepare a call to the given service method. + */ + public Call prepare(MethodDescriptor method); +} \ No newline at end of file diff --git a/stub/src/main/java/com/google/net/stubby/stub/Marshaller.java b/stub/src/main/java/com/google/net/stubby/stub/Marshaller.java new file mode 100644 index 0000000000..44c46b84e2 --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/Marshaller.java @@ -0,0 +1,20 @@ +package com.google.net.stubby.stub; + +import java.io.InputStream; + +/** + * An typed abstraction over message serialization. + */ +public interface Marshaller { + + /** + * Given a message produce an {@link InputStream} for it. + */ + // TODO(user): Switch to ByteSource equivalent when ready + public InputStream stream(T value); + + /** + * Given an {@link InputStream} parse it into an instance of the declared type. + */ + public T parse(InputStream stream); +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/Marshallers.java b/stub/src/main/java/com/google/net/stubby/stub/Marshallers.java new file mode 100644 index 0000000000..13a7c14aad --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/Marshallers.java @@ -0,0 +1,37 @@ +package com.google.net.stubby.stub; + +import com.google.net.stubby.DeferredProtoInputStream; +import com.google.net.stubby.Status; +import com.google.net.stubby.transport.Transport; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; + +import java.io.InputStream; + +/** + * Utility functions for working with Marshallers. + */ +public class Marshallers { + + private Marshallers() {} + + public static Marshaller forProto(final Parser parser) { + return new Marshaller() { + @Override + public InputStream stream(T value) { + return new DeferredProtoInputStream(value); + } + + @Override + public T parse(InputStream stream) { + try { + return parser.parseFrom(stream); + } catch (InvalidProtocolBufferException ipbe) { + throw new Status(Transport.Code.INTERNAL, "Invalid protobuf byte sequence", ipbe) + .asRuntimeException(); + } + } + }; + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/MessageSink.java b/stub/src/main/java/com/google/net/stubby/stub/MessageSink.java new file mode 100644 index 0000000000..e401cd09f0 --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/MessageSink.java @@ -0,0 +1,8 @@ +package com.google.net.stubby.stub; + +public interface MessageSink { + + public void receive(E message, boolean last); + + public void close(); +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/MessageSource.java b/stub/src/main/java/com/google/net/stubby/stub/MessageSource.java new file mode 100644 index 0000000000..8b899c221e --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/MessageSource.java @@ -0,0 +1,6 @@ +package com.google.net.stubby.stub; + +public interface MessageSource { + + public void produceToSink(MessageSink sink); +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/MethodDescriptor.java b/stub/src/main/java/com/google/net/stubby/stub/MethodDescriptor.java new file mode 100644 index 0000000000..f569721185 --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/MethodDescriptor.java @@ -0,0 +1,74 @@ +package com.google.net.stubby.stub; + +import com.google.common.base.Preconditions; + +import java.io.InputStream; +import java.util.concurrent.TimeUnit; + +import javax.annotation.concurrent.Immutable; + +/** + * Descriptor for a single operation, used by Channel to execute a call. + */ +@Immutable +public class MethodDescriptor { + private final String name; + // TODO(user): Make more generic than just proto? Use Function? + private final Marshaller requestMarshaller; + private final Marshaller responseMarshaller; + private final long timeoutMicros; + + public static MethodDescriptor create( + String name, long timeout, TimeUnit timeoutUnit, + Marshaller requestMarshaller, + Marshaller responseMarshaller) { + return new MethodDescriptor( + name, timeoutUnit.toMicros(timeout), requestMarshaller, responseMarshaller); + } + + private MethodDescriptor(String name, long timeoutMicros, + Marshaller requestMarshaller, + Marshaller responseMarshaller) { + this.name = name; + Preconditions.checkArgument(timeoutMicros > 0); + this.timeoutMicros = timeoutMicros; + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + } + + /** + * The fully qualified name of the method + */ + public String getName() { + return name; + } + + /** + * Timeout for the operation in microseconds + */ + public long getTimeout() { + return timeoutMicros; + } + + /** + * Parse a response payload from the given {@link InputStream} + */ + public ResponseT parseResponse(InputStream input) { + return responseMarshaller.parse(input); + } + + /** + * Convert a request message to an {@link InputStream} + */ + public InputStream streamRequest(RequestT requestMessage) { + return requestMarshaller.stream(requestMessage); + } + + /** + * Create a new descriptor with a different timeout + */ + public MethodDescriptor withTimeout(long timeout, TimeUnit unit) { + return new MethodDescriptor(name , unit.toMicros(timeout), + requestMarshaller, responseMarshaller); + } +} \ No newline at end of file diff --git a/stub/src/main/java/com/google/net/stubby/stub/SessionCall.java b/stub/src/main/java/com/google/net/stubby/stub/SessionCall.java new file mode 100644 index 0000000000..1f6962c2e7 --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/SessionCall.java @@ -0,0 +1,123 @@ +package com.google.net.stubby.stub; + +import com.google.net.stubby.AbstractResponse; +import com.google.net.stubby.Operation; +import com.google.net.stubby.Request; +import com.google.net.stubby.Response; +import com.google.net.stubby.Session; +import com.google.net.stubby.Status; + +import java.io.InputStream; + +/** + * A temporary shim layer between the new (Channel) and the old (Session). Will go away when the + * new transport layer is created. + */ +// TODO(user): Delete this class when new transport interfaces are introduced +public class SessionCall + extends CallContext implements Call { + + /** + * The {@link Request} used by the stub to dispatch the call + */ + private Request request; + + private StreamObserver responseObserver; + + private final MethodDescriptor methodDescriptor; + private final Session session; + + protected SessionCall(MethodDescriptor methodDescriptor, Session session) { + // This will go away when we introduce new transport API.... nothing to see here + this.methodDescriptor = methodDescriptor; + this.session = session; + } + + @Override + public void start(StreamObserver responseObserver) { + request = session.startRequest(methodDescriptor.getName(), new Response.ResponseBuilder() { + @Override + public Response build(int id) { + return new CallResponse(id); + } + + @Override + public Response build() { + return new CallResponse(-1); + } + }); + this.responseObserver = responseObserver; + } + + @Override + public void onValue(RequestT value) { + request.addPayload(methodDescriptor.streamRequest(value), Operation.Phase.PAYLOAD); + } + + /** + * An error occurred while producing the request output. Cancel the request + * and close the response stream. + */ + @Override + public void onError(Throwable t) { + request.close(Status.fromThrowable(t)); + this.responseObserver.onError(t); + } + + @Override + public void onCompleted() { + request.close(Status.OK); + } + + /** + * Adapts the transport layer response to calls on the response observer or + * recorded context state. + */ + private class CallResponse extends AbstractResponse { + + private CallResponse(int id) { + super(id); + } + + @Override + public Operation addContext(String type, InputStream message, Phase nextPhase) { + try { + SessionCall.this.addResponseContext(type, message); + return super.addContext(type, message, nextPhase); + } finally { + if (getPhase() == Phase.CLOSED) { + propagateClosed(); + } + } + } + + @Override + public Operation addPayload(InputStream payload, Phase nextPhase) { + try { + SessionCall.this.responseObserver.onValue(methodDescriptor.parseResponse(payload)); + return super.addPayload(payload, nextPhase); + } finally { + if (getPhase() == Phase.CLOSED) { + propagateClosed(); + } + } + } + + @Override + public Operation close(Status status) { + try { + return super.close(status); + } finally { + propagateClosed(); + } + } + + private void propagateClosed() { + if (Status.OK.getCode() == getStatus().getCode()) { + SessionCall.this.responseObserver.onCompleted(); + } else { + SessionCall.this.responseObserver.onError(getStatus().asRuntimeException()); + } + } + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java b/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java new file mode 100644 index 0000000000..216e0e94e2 --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java @@ -0,0 +1,20 @@ +package com.google.net.stubby.stub; + +import com.google.net.stubby.Session; + +/** + * This class is a shim between Session & Channel. Will be removed when the new transport + * API is introduced. + */ +public class SessionChannel implements Channel { + private final Session session; + + public SessionChannel(Session session) { + this.session = session; + } + + @Override + public SessionCall prepare(MethodDescriptor method) { + return new SessionCall(method, session); + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/StreamObserver.java b/stub/src/main/java/com/google/net/stubby/stub/StreamObserver.java new file mode 100644 index 0000000000..9cb502514c --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/StreamObserver.java @@ -0,0 +1,41 @@ +package com.google.net.stubby.stub; + +/** + * Receives notifications from an observable stream of messages. + */ +// TODO(user): Consider whether we need to interact with flow-control at this layer. E.g. +// public ListenableFuture onValue(V value). Do we layer it in here or as an additional +// interface? Interaction with flow control can be done by blocking here. +public interface StreamObserver { + + /** + * Receive a value from the stream. + *

+ * Can be called many times but is never called after onError or onCompleted are called. + *

+ *

+ * If an exception is thrown by an implementation the caller is expected to terminate the + * stream by calling {@linkplain #onError(Throwable)} with the caught exception prior to + * propagating it. + *

+ */ + public void onValue(V value); + + /** + * Receive a terminating error from the stream. + *

+ * May only be called once and is never called after onCompleted. In particular if an exception + * is thrown by an implementation of onError no further calls to any method are allowed. + *

+ */ + public void onError(Throwable t); + + /** + * Notifies successful stream completion. + *

+ * May only be called once and is never called after onError. In particular if an exception is + * thrown by an implementation of onCompleted no further calls to any method are allowed. + *

+ */ + public void onCompleted(); +} \ No newline at end of file diff --git a/stub/src/main/java/com/google/net/stubby/stub/StreamRecorder.java b/stub/src/main/java/com/google/net/stubby/stub/StreamRecorder.java new file mode 100644 index 0000000000..6b49af782e --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/StreamRecorder.java @@ -0,0 +1,98 @@ +package com.google.net.stubby.stub; + +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; + +/** + * Utility implementation of {link StreamObserver} used in testing. Records all the observed + * values produced by the stream as well as any errors. + */ +public class StreamRecorder implements StreamObserver { + + /** + * Create a new recorder. + */ + public static StreamRecorder create() { + return new StreamRecorder(); + } + + private final CountDownLatch latch; + private final List results; + private Throwable error; + private final SettableFuture firstValue; + + private StreamRecorder() { + firstValue = SettableFuture.create(); + latch = new CountDownLatch(1); + results = Collections.synchronizedList(Lists.newArrayList()); + } + + @Override + public void onValue(T value) { + if (!firstValue.isDone()) { + firstValue.set(value); + } + results.add(value); + } + + @Override + public void onError(Throwable t) { + if (!firstValue.isDone()) { + firstValue.setException(t); + } + error = t; + latch.countDown(); + } + + @Override + public void onCompleted() { + if (!firstValue.isDone()) { + firstValue.setException(new IllegalStateException("No first value provided")); + } + latch.countDown(); + } + + /** + * Wait for the stream to terminate. + */ + public void awaitCompletion() throws Exception { + latch.await(); + } + + /** + * Wait a fixed timeout for the stream to terminate. + */ + public boolean awaitCompletion(int timeout, TimeUnit unit) throws Exception { + return latch.await(timeout, unit); + } + + /** + * Return the current set of received values. + */ + public List getValues() { + return Collections.unmodifiableList(results); + } + + /** + * Return the stream terminating error. + */ + @Nullable public Throwable getError() { + return error; + } + + /** + * Return a {@link ListenableFuture} for the first value received from the stream. Useful + * for testing unary call patterns. + */ + public ListenableFuture firstValue() { + return firstValue; + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/StubDescriptor.java b/stub/src/main/java/com/google/net/stubby/stub/StubDescriptor.java new file mode 100644 index 0000000000..dc4380628e --- /dev/null +++ b/stub/src/main/java/com/google/net/stubby/stub/StubDescriptor.java @@ -0,0 +1,38 @@ +package com.google.net.stubby.stub; + +import com.google.net.stubby.DeferredProtoInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.MessageLite; + +import java.io.InputStream; + +/** + * StubDescriptor used by generated stubs + */ +// TODO(user): Should really be an interface +public class StubDescriptor { + + private final String name; + private final O defaultO; + + public StubDescriptor(String name, O defaultO) { + this.name = name; + this.defaultO = defaultO; + } + + public String getName() { + return name; + } + + public O parseResponse(InputStream input) { + try { + return (O) defaultO.getParserForType().parseFrom(input); + } catch (InvalidProtocolBufferException ipbe) { + throw new RuntimeException(ipbe); + } + } + + public InputStream streamRequest(I input) { + return new DeferredProtoInputStream(input); + } +}