Custom metadata and call cancellation. (#22)

Added an example showing how to send/receive custom metadata, and handle
call cancellation.

Implemented the remaining parts of metadata and cancel handling.

Addresses part of #8 and #12.
This commit is contained in:
Jakob Andersen 2017-07-07 14:31:36 +02:00 committed by GitHub
parent aefc45cbc0
commit a5e740c41a
15 changed files with 711 additions and 66 deletions

View File

@ -0,0 +1,25 @@
# Description
The metadata server and client demonstrate how to handle custom metadata,
cancellation, and timeouts in Dart gRPC.
See the definition of the metadata service in `protos/metadata.proto`.
# Run the sample code
To compile and run the example, assuming you are in the root of the metadata
folder, i.e., .../example/metadata/, first get the dependencies by running:
```sh
$ pub get
```
Then, to run the server:
```sh
$ dart bin/server.dart
```
Likewise, to run the client:
```sh
$ dart bin/client.dart
```

View File

@ -0,0 +1,9 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:metadata/src/client.dart';
main(List<String> args) {
new Client().main(args);
}

View File

@ -0,0 +1,9 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:metadata/src/server.dart';
main(List<String> args) {
new Server().main(args);
}

View File

@ -0,0 +1,107 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:grpc/grpc.dart';
import 'generated/metadata.pbgrpc.dart';
class Client {
ClientChannel channel;
MetadataClient stub;
Future<Null> main(List<String> args) async {
channel = new ClientChannel('127.0.0.1', port: 8080);
stub = new MetadataClient(channel);
// Run all of the demos in order.
await runEcho();
await runEchoDelayCancel();
await runAddOneCancel();
await runFibonacciCancel();
await runFibonacciTimeout();
await channel.close();
}
// Run the echo demo. ...
Future<Null> runEcho() async {
final request = new Record()..value = 'Kaj';
final call = stub.echo(request,
options: new CallOptions(metadata: {'peer': 'Verner'}));
call.headers.then((headers) {
print('Received header metadata: $headers');
});
call.trailers.then((trailers) {
print('Received trailer metadata: $trailers');
});
final response = await call;
print('Echo response: ${response.value}');
}
// Run the echo with delay cancel demo. ...
Future<Null> runEchoDelayCancel() async {
final channelWithOptions = new ClientChannel('127.0.0.1',
port: 8080, options: new CallOptions(metadata: {'peer': 'Verner'}));
final stubWithCustomChannel = new MetadataClient(channelWithOptions);
final request = new Record()..value = 'Kaj';
final call = stubWithCustomChannel.echo(request,
options: new CallOptions(metadata: {'delay': '1'}));
call.headers.then((headers) {
print('Received header metadata: $headers');
});
call.trailers.then((trailers) {
print('Received trailer metadata: $trailers');
});
await new Future.delayed(new Duration(milliseconds: 10));
call.cancel();
try {
final response = await call;
print('Unexpected echo response: ${response.value}');
} catch (error) {
print('Expected error: $error');
}
await channelWithOptions.close();
}
// Run the addOne cancel demo.
Future<Null> runAddOneCancel() async {
final numbers = new StreamController<int>();
final call =
stub.addOne(numbers.stream.map((value) => new Number()..value = value));
final receivedThree = new Completer<bool>();
final sub = call.listen((number) {
print('AddOneCancel: Received ${number.value}');
if (number.value == 3) {
receivedThree.complete(true);
}
});
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.add(4);
await receivedThree.future;
await call.cancel();
await Future.wait([sub.cancel(), numbers.close()]);
}
/// Call an RPC that returns a stream of Fibonacci numbers. Cancel the call
/// after receiving more than 5 responses.
Future<Null> runFibonacciCancel() async {
final call = stub.fibonacci(new Empty());
int count = 0;
await for (var number in call) {
count++;
print('Received ${number.value} (count=$count)');
if (count > 5) {
await call.cancel();
}
}
print('Final count: $count');
}
// Run the timeout demo. ...
Future<Null> runFibonacciTimeout() async {
// TODO(jakobr): Implement timeouts.
}
}

View File

@ -0,0 +1,109 @@
///
// Generated code. Do not modify.
///
// ignore_for_file: non_constant_identifier_names
// ignore_for_file: library_prefixes
library grpc_metadata;
// ignore: UNUSED_SHOWN_NAME
import 'dart:core' show int, bool, double, String, List, override;
import 'package:protobuf/protobuf.dart';
class Record extends GeneratedMessage {
static final BuilderInfo _i = new BuilderInfo('Record')
..a/*<String>*/(1, 'value', PbFieldType.OS)
..hasRequiredFields = false;
Record() : super();
Record.fromBuffer(List<int> i,
[ExtensionRegistry r = ExtensionRegistry.EMPTY])
: super.fromBuffer(i, r);
Record.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY])
: super.fromJson(i, r);
Record clone() => new Record()..mergeFromMessage(this);
BuilderInfo get info_ => _i;
static Record create() => new Record();
static PbList<Record> createRepeated() => new PbList<Record>();
static Record getDefault() {
if (_defaultInstance == null) _defaultInstance = new _ReadonlyRecord();
return _defaultInstance;
}
static Record _defaultInstance;
static void $checkItem(Record v) {
if (v is! Record) checkItemFailed(v, 'Record');
}
String get value => $_get(0, 1, '');
set value(String v) {
$_setString(0, 1, v);
}
bool hasValue() => $_has(0, 1);
void clearValue() => clearField(1);
}
class _ReadonlyRecord extends Record with ReadonlyMessageMixin {}
class Number extends GeneratedMessage {
static final BuilderInfo _i = new BuilderInfo('Number')
..a/*<int>*/(1, 'value', PbFieldType.O3)
..hasRequiredFields = false;
Number() : super();
Number.fromBuffer(List<int> i,
[ExtensionRegistry r = ExtensionRegistry.EMPTY])
: super.fromBuffer(i, r);
Number.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY])
: super.fromJson(i, r);
Number clone() => new Number()..mergeFromMessage(this);
BuilderInfo get info_ => _i;
static Number create() => new Number();
static PbList<Number> createRepeated() => new PbList<Number>();
static Number getDefault() {
if (_defaultInstance == null) _defaultInstance = new _ReadonlyNumber();
return _defaultInstance;
}
static Number _defaultInstance;
static void $checkItem(Number v) {
if (v is! Number) checkItemFailed(v, 'Number');
}
int get value => $_get(0, 1, 0);
set value(int v) {
$_setUnsignedInt32(0, 1, v);
}
bool hasValue() => $_has(0, 1);
void clearValue() => clearField(1);
}
class _ReadonlyNumber extends Number with ReadonlyMessageMixin {}
class Empty extends GeneratedMessage {
static final BuilderInfo _i = new BuilderInfo('Empty')
..hasRequiredFields = false;
Empty() : super();
Empty.fromBuffer(List<int> i, [ExtensionRegistry r = ExtensionRegistry.EMPTY])
: super.fromBuffer(i, r);
Empty.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY])
: super.fromJson(i, r);
Empty clone() => new Empty()..mergeFromMessage(this);
BuilderInfo get info_ => _i;
static Empty create() => new Empty();
static PbList<Empty> createRepeated() => new PbList<Empty>();
static Empty getDefault() {
if (_defaultInstance == null) _defaultInstance = new _ReadonlyEmpty();
return _defaultInstance;
}
static Empty _defaultInstance;
static void $checkItem(Empty v) {
if (v is! Empty) checkItemFailed(v, 'Empty');
}
}
class _ReadonlyEmpty extends Empty with ReadonlyMessageMixin {}

View File

@ -0,0 +1,94 @@
///
// Generated code. Do not modify.
///
// ignore_for_file: non_constant_identifier_names
// ignore_for_file: library_prefixes
library grpc_metadata_pbgrpc;
import 'dart:async';
import 'package:grpc/grpc.dart';
import 'metadata.pb.dart';
export 'metadata.pb.dart';
class MetadataClient {
final ClientChannel _channel;
static final _$echo = new ClientMethod<Record, Record>(
'/grpc.Metadata/Echo',
(Record value) => value.writeToBuffer(),
(List<int> value) => new Record.fromBuffer(value));
static final _$addOne = new ClientMethod<Number, Number>(
'/grpc.Metadata/AddOne',
(Number value) => value.writeToBuffer(),
(List<int> value) => new Number.fromBuffer(value));
static final _$fibonacci = new ClientMethod<Empty, Number>(
'/grpc.Metadata/Fibonacci',
(Empty value) => value.writeToBuffer(),
(List<int> value) => new Number.fromBuffer(value));
MetadataClient(this._channel);
ResponseFuture<Record> echo(Record request, {CallOptions options}) {
final call = new ClientCall(_channel, _$echo, options: options);
call.request
..add(request)
..close();
return new ResponseFuture(call);
}
ResponseStream<Number> addOne(Stream<Number> request, {CallOptions options}) {
final call = new ClientCall(_channel, _$addOne, options: options);
request.pipe(call.request);
return new ResponseStream(call);
}
ResponseStream<Number> fibonacci(Empty request, {CallOptions options}) {
final call = new ClientCall(_channel, _$fibonacci, options: options);
call.request
..add(request)
..close();
return new ResponseStream(call);
}
}
abstract class MetadataServiceBase extends Service {
String get $name => 'grpc.Metadata';
MetadataServiceBase() {
$addMethod(new ServiceMethod(
'Echo',
echo_Pre,
false,
false,
(List<int> value) => new Record.fromBuffer(value),
(Record value) => value.writeToBuffer()));
$addMethod(new ServiceMethod(
'AddOne',
addOne,
true,
true,
(List<int> value) => new Number.fromBuffer(value),
(Number value) => value.writeToBuffer()));
$addMethod(new ServiceMethod(
'Fibonacci',
fibonacci_Pre,
false,
true,
(List<int> value) => new Empty.fromBuffer(value),
(Number value) => value.writeToBuffer()));
}
Future<Record> echo_Pre(ServiceCall call, Future<Record> request) async {
return echo(call, await request);
}
Stream<Number> fibonacci_Pre(ServiceCall call, Future<Empty> request) async* {
yield* fibonacci(call, await request);
}
Future<Record> echo(ServiceCall call, Record request);
Stream<Number> addOne(ServiceCall call, Stream<Number> request);
Stream<Number> fibonacci(ServiceCall call, Empty request);
}

View File

@ -0,0 +1,74 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'package:grpc/grpc.dart' as grpc;
import 'generated/metadata.pbgrpc.dart';
class MetadataService extends MetadataServiceBase {
int callCount = 0;
@override
Future<Record> echo(grpc.ServiceCall call, Record request) async {
final peer = call.clientMetadata['peer'];
final count = callCount++;
print('Echo: Call #$count: Peer: $peer, request: ${request.value}');
call.headers['count'] = '${count}';
call.trailers['hello'] = request.value;
final delay = call.clientMetadata['delay'];
if (delay != null) {
await new Future.delayed(new Duration(seconds: int.parse(delay)));
}
return new Record()..value = peer;
}
@override
Stream<Number> addOne(grpc.ServiceCall call, Stream<Number> request) async* {
int lastNumber = -1;
try {
await for (var number in request) {
lastNumber = number.value;
yield new Number()..value = number.value + 1;
}
} catch (error) {
print('Caught: $error, last number = $lastNumber');
} finally {
if (call.isCanceled) {
print('AddOne: Call canceled');
}
}
}
/// Streams a Fibonacci number every 500ms until the call is canceled.
Stream<Number> fibonacci(grpc.ServiceCall call, Empty request) async* {
int previous = 0;
int current = 1;
try {
while (true) {
await new Future.delayed(new Duration(milliseconds: 500));
yield new Number()..value = current;
final next = current + previous;
previous = current;
current = next;
}
} finally {
if (call.isCanceled) {
print('Fibonacci: Canceled.');
}
}
}
}
class Server {
Future<Null> main(List<String> args) async {
final server = new grpc.Server(port: 8080)
..addService(new MetadataService());
await server.serve();
print('Server listening...');
}
}

View File

@ -0,0 +1,67 @@
// Copyright 2017, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following disclaimer
// in the documentation and/or other materials provided with the
// distribution.
// * Neither the name of Google Inc. nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
syntax = "proto3";
package grpc;
// Interface exported by the server.
service Metadata {
// Echo metadata.
//
// Echoes the given input as trailer metadata. Sets a call counter as header
// metadata, and returns the value of the 'hello' key in the client metadata
// as the result.
rpc Echo(Record) returns (Record) {}
// Adds 1 to the numbers in the request stream.
//
// Uses bidirectional streaming.
rpc AddOne(stream Number) returns (stream Number) {}
// Fibonacci.
//
// Streams Fibonacci numbers until the call is canceled or times out.
rpc Fibonacci(Empty) returns (stream Number) {}
}
// A message containing a single string value.
message Record {
string value = 1;
}
// A message containing a single number.
message Number {
int32 value = 1;
}
// A message containing nothing.
message Empty {
}

View File

@ -0,0 +1,17 @@
name: metadata
description: Dart gRPC sample client and server.
version: 0.0.1
homepage: https://github.com/dart-lang/grpc-dart
environment:
sdk: '>=1.20.1 <2.0.0'
dependencies:
async: ^1.13.3
grpc:
path: ../../
protobuf: ^0.5.4
http2: ^0.1.2
dev_dependencies:
test: ^0.12.0

View File

@ -0,0 +1,4 @@
#!/usr/bin/env bash
protoc --dart_out=grpc:lib/src/generated -Iprotos protos/metadata.proto
rm lib/src/generated/metadata.pb{enum,json}.dart
dartfmt -w lib/src/generated

View File

@ -2,8 +2,6 @@
The route guide server and client demonstrate how to use Dart gRPC libraries to
perform unary, client streaming, server streaming and full duplex RPCs.
Please refer to [gRPC Basics: Dart]() for more information.
See the definition of the route guide service in `protos/route_guide.proto`.
# Run the sample code

View File

@ -15,12 +15,9 @@ class Client {
ClientChannel channel;
RouteGuideClient stub;
Client() {
Future<Null> main(List<String> args) async {
channel = new ClientChannel('127.0.0.1', port: 8080);
stub = new RouteGuideClient(channel);
}
Future<Null> main(List<String> args) async {
// Run all of the demos in order.
await runGetFeature();
await runListFeatures();
@ -128,9 +125,9 @@ class Client {
}
final call = stub.routeChat(outgoingNotes());
await call.forEach((note) {
await for (var note in call) {
print('Got message ${note.message} at ${note.location.latitude}, ${note
.location.longitude}');
});
}
}
}

View File

@ -12,16 +12,44 @@ import 'package:http2/transport.dart';
import 'shared.dart';
import 'streams.dart';
const _reservedHeaders = const [
'content-type',
'te',
'grpc-timeout',
'grpc-accept-encoding',
'user-agent',
];
/// Runtime options for a RPC call.
class CallOptions {
final Map<String, String> metadata;
final Duration timeout;
CallOptions._(this.metadata, this.timeout);
factory CallOptions({Map<String, String> metadata, Duration timeout}) {
final sanitizedMetadata = <String, String>{};
metadata?.forEach((key, value) {
final lowerCaseKey = key.toLowerCase();
if (!lowerCaseKey.startsWith(':') &&
!_reservedHeaders.contains(lowerCaseKey)) {
sanitizedMetadata[lowerCaseKey] = value;
}
});
return new CallOptions._(new Map.unmodifiable(sanitizedMetadata), timeout);
}
}
/// A channel to an RPC endpoint.
class ClientChannel {
final String host;
final int port;
final CallOptions options;
final List<Socket> _sockets = [];
final List<TransportConnection> _connections = [];
// TODO(jakobr): Channel options.
ClientChannel(this.host, {this.port = 8080});
ClientChannel(this.host, {this.port = 8080, this.options});
/// Returns a connection to this [Channel]'s RPC endpoint. The connection may
/// be shared between multiple RPC calls.
@ -76,32 +104,65 @@ class ClientCall<Q, R> implements Response {
StreamController<R> _responses;
StreamSubscription<GrpcMessage> _responseSubscription;
final Map<String, String> metadata;
final CallOptions options;
ClientCall(this._channel, this._method, {this.metadata = const {}}) {
Future<Null> _callSetup;
ClientCall(this._channel, this._method, {this.options}) {
_responses = new StreamController(onListen: _onResponseListen);
_call().catchError((error) {
_callSetup = _initiateCall().catchError((error) {
_responses.addError(
new GrpcError(1703, 'Error connecting: ${error.toString()}'));
});
}
Future<Null> _call() async {
/// Convert [timeout] to grpc-timeout header string format.
// Mostly inspired by grpc-java implementation.
// TODO(jakobr): Modify to match grpc/core implementation instead.
static String toTimeoutString(Duration duration) {
const cutoff = 100000;
final timeout = duration.inMicroseconds;
if (timeout < 0) {
// Smallest possible timeout.
return '1n';
} else if (timeout < cutoff) {
return '${timeout}u';
} else if (timeout < cutoff * 1000) {
return '${timeout~/1000}m';
} else if (timeout < cutoff * 1000 * 1000) {
return '${timeout~/1000000}S';
} else if (timeout < cutoff * 1000 * 1000 * 60) {
return '${timeout~/60000000}M';
} else {
return '${timeout~/3600000000}H';
}
}
Future<Null> _initiateCall() async {
final connection = await _channel.connect();
final timeout = options?.timeout ?? _channel.options?.timeout;
// TODO(jakobr): Populate HTTP-specific headers in connection?
final headers = <Header>[
_methodPost,
_schemeHttp,
new Header.ascii(':path', _method.path),
new Header.ascii(':authority', _channel.host),
new Header.ascii('grpc-timeout', '5S'),
];
if (timeout != null) {
headers.add(new Header.ascii('grpc-timeout', toTimeoutString(timeout)));
}
headers.addAll([
_contentTypeGrpc,
_teTrailers,
_grpcAcceptEncoding,
_userAgent,
];
metadata.forEach((key, value) {
// TODO(jakobr): Filter out headers owned by gRPC.
]);
// TODO(jakobr): Flip this around, and have the Channel create the call
// object and apply options (including the above TODO).
final customMetadata = <String, String>{};
customMetadata.addAll(_channel.options?.metadata ?? {});
customMetadata.addAll(options?.metadata ?? {});
customMetadata.forEach((key, value) {
headers.add(new Header.ascii(key, value));
});
_stream = connection.makeRequest(headers);
@ -250,11 +311,16 @@ class ClientCall<Q, R> implements Response {
@override
Future<Null> cancel() async {
_stream.terminate();
final futures = <Future>[
_requests.close(),
_responses.close(),
];
_callSetup.whenComplete(() {
// Terminate the stream if the call connects after being canceled.
_stream?.terminate();
});
// Don't await _responses.close() here. It'll only complete once the done
// event has been delivered, and it's the caller of this function that is
// reading from responses as well, so we might end up deadlocked.
_responses.close();
_stream?.terminate();
final futures = <Future>[_requests.close()];
if (_responseSubscription != null) {
futures.add(_responseSubscription.cancel());
}

View File

@ -95,32 +95,36 @@ class Server {
///
/// Gives the method handler access to custom metadata from the client, and
/// ability to set custom metadata on the header/trailer sent to the client.
abstract class ServiceCall {
class ServiceCall {
final ServerHandler _handler;
ServiceCall(this._handler);
/// Custom metadata from the client.
Map<String, String> get clientMetadata;
Map<String, String> get clientMetadata => _handler._clientMetadata;
/// Custom metadata to be sent to the client. Will be [null] once the headers
/// have been sent, either when [sendHeaders] is called, or when the first
/// response message is sent.
Map<String, String> get headers;
Map<String, String> get headers => _handler._customHeaders;
/// Custom metadata to be sent to the client after all response messages.
Map<String, String> get trailers;
Map<String, String> get trailers => _handler._customTrailers;
/// Deadline for this call. If the call is still active after this time, then
/// the client or server may cancel it.
DateTime get deadline;
DateTime get deadline => _handler._deadline;
/// Returns [true] if the [deadline] has been exceeded.
bool get isTimedOut;
bool get isTimedOut => _handler._isTimedOut;
/// Returns [true] if the client has canceled this call.
bool get isCanceled;
bool get isCanceled => _handler._isCanceled;
/// Send response headers. This is done automatically before sending the first
/// response message, but can be done manually before the first response is
/// ready, if necessary.
void sendHeaders();
void sendHeaders() => _handler._sendHeaders();
/// Send response trailers. A trailer indicating success ([status] == 0) will
/// be sent automatically when all responses are sent. This method can be used
@ -128,7 +132,9 @@ abstract class ServiceCall {
///
/// The call will be closed after calling this method, and no further
/// responses can be sent.
void sendTrailer(int status, [String statusMessage]);
/// responses can be sent.
void sendTrailer(int status, [String statusMessage]) =>
_handler._sendTrailers(status: status, message: statusMessage);
}
/// Handles an incoming gRPC call.
@ -144,12 +150,27 @@ class ServerHandler {
StreamController _requests;
bool _hasReceivedRequest = false;
Stream _responses;
StreamSubscription _responseSubscription;
bool _headersSent = false;
Map<String, String> _customHeaders = {};
Map<String, String> _customTrailers = {};
DateTime _deadline;
bool _isCanceled = false;
ServerHandler(this._methodLookup, this._stream);
bool get isCanceled => _isCanceled;
bool get _isTimedOut => _deadline?.isBefore(new DateTime.now()) ?? false;
void handle() {
_stream.onTerminated = (int errorCode) {
_isCanceled = true;
_responseSubscription?.cancel();
};
_incomingSubscription = _stream.incomingMessages
.transform(new GrpcHttpDecoder())
.transform(grpcDecompressor())
@ -176,7 +197,7 @@ class ServerHandler {
final method = path[2];
_descriptor = _methodLookup(service, method);
if (_descriptor == null) {
_sendError(404, 'Method not found');
_sendError(404, 'Path /$service/$method not found');
return;
}
_startStreamingRequest();
@ -190,23 +211,23 @@ class ServerHandler {
onResume: _incomingSubscription.resume);
_incomingSubscription.onData(_onDataActive);
Stream responses;
final context = new ServiceCall(this);
if (_descriptor.streamingResponse) {
if (_descriptor.streamingRequest) {
responses = _descriptor.handler(null, _requests.stream);
_responses = _descriptor.handler(context, _requests.stream);
} else {
responses = _descriptor.handler(null, _requests.stream.single);
_responses = _descriptor.handler(context, _requests.stream.single);
}
} else {
Future response;
if (_descriptor.streamingRequest) {
response = _descriptor.handler(null, _requests.stream);
response = _descriptor.handler(context, _requests.stream);
} else {
response = _descriptor.handler(null, _requests.stream.single);
response = _descriptor.handler(context, _requests.stream.single);
}
responses = response.asStream();
_responses = response.asStream();
}
_responseSubscription = responses.listen(_onResponse,
_responseSubscription = _responses.listen(_onResponse,
onError: _onResponseError,
onDone: _onResponseDone,
cancelOnError: true);
@ -251,9 +272,23 @@ class ServerHandler {
// -- Active state, outgoing response data --
void _onResponse(response) {
_ensureHeadersSent();
final bytes = _descriptor.responseSerializer(response);
_stream.sendData(GrpcHttpEncoder.frame(bytes));
try {
if (!_headersSent) {
_sendHeaders();
}
final bytes = _descriptor.responseSerializer(response);
_stream.sendData(GrpcHttpEncoder.frame(bytes));
} catch (error) {
_responseSubscription.cancel();
if (!_requests.isClosed) {
// If we can, alert the handler that things are going wrong.
_requests
.addError(new GrpcError(1001, 'Error sending response: $error'));
_requests.close();
}
_incomingSubscription.cancel();
_stream.terminate();
}
}
void _onResponseDone() {
@ -269,36 +304,44 @@ class ServerHandler {
}
}
void _ensureHeadersSent() {
if (_headersSent) return;
_sendHeaders();
}
void _sendHeaders() {
if (_headersSent) throw new GrpcError(1514, 'Headers already sent');
final headers = [
new Header.ascii(':status',
200.toString()), // TODO(jakobr): Should really be on package:http2.
new Header.ascii('content-type', 'application/grpc'),
];
// headers.addAll(context.headers);
final headersMap = <String, String>{};
headersMap.addAll(_customHeaders);
_customHeaders = null;
// TODO(jakobr): Should come from package:http2?
headersMap[':status'] = '200';
headersMap['content-type'] = 'application/grpc';
final headers = <Header>[];
headersMap
.forEach((key, value) => headers.add(new Header.ascii(key, value)));
_stream.sendHeaders(headers);
_headersSent = true;
}
void _sendTrailers({int status = 0, String message}) {
final trailers = <Header>[];
final trailersMap = <String, String>{};
if (!_headersSent) {
trailers.addAll([
new Header.ascii(':status', 200.toString()),
new Header.ascii('content-type', 'application/grpc'),
]);
trailersMap.addAll(_customHeaders);
_customHeaders = null;
}
trailers.add(new Header.ascii('grpc-status', status.toString()));
trailersMap.addAll(_customTrailers);
_customTrailers = null;
if (!_headersSent) {
// TODO(jakobr): Should come from package:http2?
trailersMap[':status'] = '200';
trailersMap['content-type'] = 'application/grpc';
}
trailersMap['grpc-status'] = status.toString();
if (message != null) {
trailers.add(new Header.ascii('grpc-message', message));
trailersMap['grpc-message'] = message;
}
// trailers.addAll(context.trailers);
final trailers = <Header>[];
trailersMap
.forEach((key, value) => trailers.add(new Header.ascii(key, value)));
_stream.sendHeaders(trailers, endStream: true);
// We're done!
_incomingSubscription.cancel();
@ -308,9 +351,11 @@ class ServerHandler {
// -- All states, incoming error / stream closed --
void _onError(error) {
print('Stream error: $error');
// TODO(jakobr): Handle. Might be a cancel request from the client, which
// should be propagated.
// Exception from the incoming stream. Most likely a cancel request from the
// client, so we treat it as such.
_isCanceled = true;
_requests.addError(new GrpcError(1001, 'Canceled'));
_responseSubscription?.cancel();
}
void _onDoneError() {
@ -327,7 +372,6 @@ class ServerHandler {
}
void _sendError(int status, String message) {
print('Sending error $status: $message');
_sendTrailers(status: status, message: message);
}
}

25
test/timeout_test.dart Normal file
View File

@ -0,0 +1,25 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'package:test/test.dart';
import 'package:grpc/src/client.dart';
void main() {
test('Timeouts are converted correctly', () {
expect(ClientCall.toTimeoutString(new Duration(microseconds: -1)), '1n');
expect(ClientCall.toTimeoutString(new Duration(microseconds: 0)), '0u');
expect(ClientCall.toTimeoutString(new Duration(microseconds: 107)), '107u');
expect(ClientCall.toTimeoutString(new Duration(hours: 2, microseconds: 17)),
'7200S');
expect(ClientCall.toTimeoutString(new Duration(milliseconds: 1420665)),
'1420S');
expect(
ClientCall.toTimeoutString(new Duration(seconds: 2, microseconds: 3)),
'2000m');
expect(
ClientCall.toTimeoutString(new Duration(seconds: 2, milliseconds: 3)),
'2003m');
});
}