Rename onValue to onNext in StreamObserver to align with the naming in Rx & the proposed Flow API in

Java9
This commit is contained in:
Louis Ryan 2015-08-26 14:15:30 -07:00
parent 15f02ba19c
commit a4c821d94e
14 changed files with 54 additions and 54 deletions

View File

@ -372,7 +372,7 @@ public abstract class AbstractBenchmark {
for (int i = 0; i < callsPerChannel; i++) {
StreamObserver<ByteBuf> observer = new StreamObserver<ByteBuf>() {
@Override
public void onValue(ByteBuf value) {
public void onNext(ByteBuf value) {
counter.addAndGet(counterDelta);
}
@ -414,10 +414,10 @@ public abstract class AbstractBenchmark {
streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onValue(ByteBuf value) {
public void onNext(ByteBuf value) {
if (!done.get()) {
counter.addAndGet(counterDelta);
requestObserverRef.get().onValue(request.slice());
requestObserverRef.get().onNext(request.slice());
streamingCall.request(1);
}
}
@ -432,8 +432,8 @@ public abstract class AbstractBenchmark {
}
});
requestObserverRef.set(requestObserver);
requestObserver.onValue(request.slice());
requestObserver.onValue(request.slice());
requestObserver.onNext(request.slice());
requestObserver.onNext(request.slice());
}
}
}
@ -457,7 +457,7 @@ public abstract class AbstractBenchmark {
streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onValue(ByteBuf value) {
public void onNext(ByteBuf value) {
if (!done.get()) {
counter.addAndGet(counterDelta);
streamingCall.request(1);
@ -474,7 +474,7 @@ public abstract class AbstractBenchmark {
}
});
requestObserverRef.set(requestObserver);
requestObserver.onValue(request.slice());
requestObserver.onNext(request.slice());
}
}
}

View File

@ -179,7 +179,7 @@ public class AsyncClient {
long lastCall = System.nanoTime();
@Override
public void onValue(SimpleResponse value) {
public void onNext(SimpleResponse value) {
}
@Override
@ -220,7 +220,7 @@ public class AsyncClient {
StreamObserver<SimpleRequest> requestObserver = stub.streamingCall(responseObserver);
responseObserver.requestObserver = requestObserver;
requestObserver.onValue(request);
requestObserver.onNext(request);
return future;
}
@ -249,14 +249,14 @@ public class AsyncClient {
}
@Override
public void onValue(SimpleResponse value) {
public void onNext(SimpleResponse value) {
long now = System.nanoTime();
// Record the latencies in microseconds
histogram.recordValue((now - lastCall) / 1000);
lastCall = now;
if (endTime > now) {
requestObserver.onValue(request);
requestObserver.onNext(request);
} else {
requestObserver.onCompleted();
}

View File

@ -184,7 +184,7 @@ public class AsyncServer {
@Override
public void unaryCall(SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
SimpleResponse response = buildSimpleResponse(request);
responseObserver.onValue(response);
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@ -193,9 +193,9 @@ public class AsyncServer {
final StreamObserver<SimpleResponse> responseObserver) {
return new StreamObserver<SimpleRequest>() {
@Override
public void onValue(SimpleRequest request) {
public void onNext(SimpleRequest request) {
SimpleResponse response = buildSimpleResponse(request);
responseObserver.onValue(response);
responseObserver.onNext(response);
}
@Override

View File

@ -203,7 +203,7 @@ public class OpenLoopClient {
private final long start = System.nanoTime();
@Override
public void onValue(SimpleResponse value) {
public void onNext(SimpleResponse value) {
}
@Override

View File

@ -87,7 +87,7 @@ public class CustomHeaderServer {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloResponse> responseObserver) {
HelloResponse reply = HelloResponse.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onValue(reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

View File

@ -82,7 +82,7 @@ public class HelloWorldServer {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloResponse> responseObserver) {
HelloResponse reply = HelloResponse.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onValue(reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

View File

@ -132,7 +132,7 @@ public class RouteGuideClient {
final SettableFuture<Void> finishFuture = SettableFuture.create();
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
public void onValue(RouteSummary summary) {
public void onNext(RouteSummary summary) {
info("Finished trip with {0} points. Passed {1} features. "
+ "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
@ -159,7 +159,7 @@ public class RouteGuideClient {
Point point = features.get(index).getLocation();
info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
RouteGuideUtil.getLongitude(point));
requestObserver.onValue(point);
requestObserver.onNext(point);
// Sleep for a bit before sending the next one.
Thread.sleep(rand.nextInt(1000) + 500);
if (finishFuture.isDone()) {
@ -188,7 +188,7 @@ public class RouteGuideClient {
StreamObserver<RouteNote> requestObserver =
asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
public void onValue(RouteNote note) {
public void onNext(RouteNote note) {
info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
.getLatitude(), note.getLocation().getLongitude());
}
@ -212,7 +212,7 @@ public class RouteGuideClient {
for (RouteNote request : requests) {
info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
.getLatitude(), request.getLocation().getLongitude());
requestObserver.onValue(request);
requestObserver.onNext(request);
}
requestObserver.onCompleted();

View File

@ -131,7 +131,7 @@ public class RouteGuideServer {
*/
@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
responseObserver.onValue(checkFeature(request));
responseObserver.onNext(checkFeature(request));
responseObserver.onCompleted();
}
@ -156,7 +156,7 @@ public class RouteGuideServer {
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
responseObserver.onValue(feature);
responseObserver.onNext(feature);
}
}
responseObserver.onCompleted();
@ -179,7 +179,7 @@ public class RouteGuideServer {
long startTime = System.nanoTime();
@Override
public void onValue(Point point) {
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
@ -200,7 +200,7 @@ public class RouteGuideServer {
@Override
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
responseObserver.onValue(RouteSummary.newBuilder().setPointCount(pointCount)
responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
responseObserver.onCompleted();
@ -219,12 +219,12 @@ public class RouteGuideServer {
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
public void onValue(RouteNote note) {
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
// Respond with all previous notes at this location.
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
responseObserver.onValue(prevNote);
responseObserver.onNext(prevNote);
}
// Now add the new note to the list

View File

@ -240,7 +240,7 @@ public abstract class AbstractTransportTest {
StreamObserver<StreamingInputCallRequest> requestObserver =
asyncStub.streamingInputCall(responseObserver);
for (StreamingInputCallRequest request : requests) {
requestObserver.onValue(request);
requestObserver.onNext(request);
}
requestObserver.onCompleted();
assertEquals(goldenResponse, responseObserver.firstValue().get());
@ -300,8 +300,8 @@ public abstract class AbstractTransportTest {
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(responseObserver);
for (int i = 0; i < requests.size(); i++) {
requestObserver.onValue(requests.get(i));
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponses.get(i));
requestObserver.onNext(requests.get(i));
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponses.get(i));
verifyNoMoreInteractions(responseObserver);
}
requestObserver.onCompleted();
@ -349,8 +349,8 @@ public abstract class AbstractTransportTest {
StreamObserver<StreamingOutputCallResponse> responseObserver = mock(StreamObserver.class);
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(responseObserver);
requestObserver.onValue(request);
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponse);
requestObserver.onNext(request);
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponse);
verifyNoMoreInteractions(responseObserver);
requestObserver.onError(new RuntimeException());
@ -378,7 +378,7 @@ public abstract class AbstractTransportTest {
final int numRequests = 10;
for (int ix = numRequests; ix > 0; --ix) {
requestStream.onValue(request);
requestStream.onNext(request);
}
requestStream.onCompleted();
recorder.awaitCompletion();
@ -410,7 +410,7 @@ public abstract class AbstractTransportTest {
final int numRequests = 10;
for (int ix = numRequests; ix > 0; --ix) {
requestStream.onValue(request);
requestStream.onNext(request);
}
requestStream.onCompleted();
recorder.awaitCompletion();
@ -570,7 +570,7 @@ public abstract class AbstractTransportTest {
final int numRequests = 10;
for (int ix = numRequests; ix > 0; --ix) {
requestStream.onValue(request);
requestStream.onNext(request);
}
requestStream.onCompleted();
recorder.awaitCompletion();
@ -695,15 +695,15 @@ public abstract class AbstractTransportTest {
StreamObserver<StreamingOutputCallResponse> responseObserver = mock(StreamObserver.class);
StreamObserver<StreamingOutputCallRequest> requestObserver
= asyncStub.fullDuplexCall(responseObserver);
requestObserver.onValue(requests.get(0));
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponses.get(0));
requestObserver.onNext(requests.get(0));
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponses.get(0));
// Initiate graceful shutdown.
channel.shutdown();
requestObserver.onValue(requests.get(1));
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponses.get(1));
requestObserver.onNext(requests.get(1));
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponses.get(1));
// The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't.
requestObserver.onValue(requests.get(2));
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponses.get(2));
requestObserver.onNext(requests.get(2));
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponses.get(2));
requestObserver.onCompleted();
verify(responseObserver, timeout(OPERATION_TIMEOUT)).onCompleted();
verifyNoMoreInteractions(responseObserver);

View File

@ -78,7 +78,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService {
@Override
public void emptyCall(EmptyProtos.Empty empty,
StreamObserver<EmptyProtos.Empty> responseObserver) {
responseObserver.onValue(EmptyProtos.Empty.getDefaultInstance());
responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance());
responseObserver.onCompleted();
}
@ -101,7 +101,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService {
.setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE)
.setBody(payload);
}
responseObserver.onValue(responseBuilder.build());
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
}
@ -127,13 +127,13 @@ public class TestServiceImpl implements TestServiceGrpc.TestService {
private int totalPayloadSize;
@Override
public void onValue(StreamingInputCallRequest message) {
public void onNext(StreamingInputCallRequest message) {
totalPayloadSize += message.getPayload().getBody().size();
}
@Override
public void onCompleted() {
responseObserver.onValue(StreamingInputCallResponse.newBuilder()
responseObserver.onNext(StreamingInputCallResponse.newBuilder()
.setAggregatedPayloadSize(totalPayloadSize).build());
responseObserver.onCompleted();
}
@ -155,7 +155,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService {
final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver);
return new StreamObserver<StreamingOutputCallRequest>() {
@Override
public void onValue(StreamingOutputCallRequest request) {
public void onNext(StreamingOutputCallRequest request) {
dispatcher.enqueue(toChunkQueue(request));
}
@ -182,7 +182,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService {
final Queue<Chunk> chunks = new LinkedList<Chunk>();
return new StreamObserver<StreamingOutputCallRequest>() {
@Override
public void onValue(StreamingOutputCallRequest request) {
public void onNext(StreamingOutputCallRequest request) {
chunks.addAll(toChunkQueue(request));
}
@ -271,7 +271,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService {
// Pop off the next chunk and send it to the client.
Chunk chunk = chunks.remove();
responseStream.onValue(chunk.toResponse());
responseStream.onNext(chunk.toResponse());
} catch (Throwable e) {
responseStream.onError(e);

View File

@ -206,7 +206,7 @@ public class ClientCalls {
}
@Override
public void onValue(T value) {
public void onNext(T value) {
call.sendMessage(value);
}
@ -248,7 +248,7 @@ public class ClientCalls {
.asRuntimeException();
}
firstResponseReceived = true;
observer.onValue(message);
observer.onNext(message);
if (streamingResponse) {
// Request delivery of the next inbound message.

View File

@ -186,7 +186,7 @@ public class ServerCalls {
@Override
public void onMessage(ReqT request) {
requestObserver.onValue(request);
requestObserver.onNext(request);
// Request delivery of the next inbound message.
call.request(1);
@ -227,7 +227,7 @@ public class ServerCalls {
}
@Override
public void onValue(RespT response) {
public void onNext(RespT response) {
if (cancelled) {
throw Status.CANCELLED.asRuntimeException();
}

View File

@ -55,7 +55,7 @@ public interface StreamObserver<V> {
*
* @param value the value passed to the stream
*/
void onValue(V value);
void onNext(V value);
/**
* Receives a terminating error from the stream.

View File

@ -67,7 +67,7 @@ public class StreamRecorder<T> implements StreamObserver<T> {
}
@Override
public void onValue(T value) {
public void onNext(T value) {
if (!firstValue.isDone()) {
firstValue.set(value);
}