diff --git a/example/metadata/README.md b/example/metadata/README.md new file mode 100644 index 0000000..947605c --- /dev/null +++ b/example/metadata/README.md @@ -0,0 +1,25 @@ +# Description +The metadata server and client demonstrate how to handle custom metadata, +cancellation, and timeouts in Dart gRPC. + +See the definition of the metadata service in `protos/metadata.proto`. + +# Run the sample code +To compile and run the example, assuming you are in the root of the metadata +folder, i.e., .../example/metadata/, first get the dependencies by running: + +```sh +$ pub get +``` + +Then, to run the server: + +```sh +$ dart bin/server.dart +``` + +Likewise, to run the client: + +```sh +$ dart bin/client.dart +``` diff --git a/example/metadata/bin/client.dart b/example/metadata/bin/client.dart new file mode 100644 index 0000000..a917d3e --- /dev/null +++ b/example/metadata/bin/client.dart @@ -0,0 +1,9 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:metadata/src/client.dart'; + +main(List args) { + new Client().main(args); +} diff --git a/example/metadata/bin/server.dart b/example/metadata/bin/server.dart new file mode 100644 index 0000000..6af89ac --- /dev/null +++ b/example/metadata/bin/server.dart @@ -0,0 +1,9 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:metadata/src/server.dart'; + +main(List args) { + new Server().main(args); +} diff --git a/example/metadata/lib/src/client.dart b/example/metadata/lib/src/client.dart new file mode 100644 index 0000000..ecc3f5e --- /dev/null +++ b/example/metadata/lib/src/client.dart @@ -0,0 +1,107 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:grpc/grpc.dart'; + +import 'generated/metadata.pbgrpc.dart'; + +class Client { + ClientChannel channel; + MetadataClient stub; + + Future main(List args) async { + channel = new ClientChannel('127.0.0.1', port: 8080); + stub = new MetadataClient(channel); + // Run all of the demos in order. + await runEcho(); + await runEchoDelayCancel(); + await runAddOneCancel(); + await runFibonacciCancel(); + await runFibonacciTimeout(); + await channel.close(); + } + + // Run the echo demo. ... + Future runEcho() async { + final request = new Record()..value = 'Kaj'; + final call = stub.echo(request, + options: new CallOptions(metadata: {'peer': 'Verner'})); + call.headers.then((headers) { + print('Received header metadata: $headers'); + }); + call.trailers.then((trailers) { + print('Received trailer metadata: $trailers'); + }); + final response = await call; + print('Echo response: ${response.value}'); + } + + // Run the echo with delay cancel demo. ... + Future runEchoDelayCancel() async { + final channelWithOptions = new ClientChannel('127.0.0.1', + port: 8080, options: new CallOptions(metadata: {'peer': 'Verner'})); + final stubWithCustomChannel = new MetadataClient(channelWithOptions); + final request = new Record()..value = 'Kaj'; + final call = stubWithCustomChannel.echo(request, + options: new CallOptions(metadata: {'delay': '1'})); + call.headers.then((headers) { + print('Received header metadata: $headers'); + }); + call.trailers.then((trailers) { + print('Received trailer metadata: $trailers'); + }); + await new Future.delayed(new Duration(milliseconds: 10)); + call.cancel(); + try { + final response = await call; + print('Unexpected echo response: ${response.value}'); + } catch (error) { + print('Expected error: $error'); + } + await channelWithOptions.close(); + } + + // Run the addOne cancel demo. + Future runAddOneCancel() async { + final numbers = new StreamController(); + final call = + stub.addOne(numbers.stream.map((value) => new Number()..value = value)); + final receivedThree = new Completer(); + final sub = call.listen((number) { + print('AddOneCancel: Received ${number.value}'); + if (number.value == 3) { + receivedThree.complete(true); + } + }); + numbers.add(1); + numbers.add(2); + numbers.add(3); + numbers.add(4); + await receivedThree.future; + await call.cancel(); + await Future.wait([sub.cancel(), numbers.close()]); + } + + /// Call an RPC that returns a stream of Fibonacci numbers. Cancel the call + /// after receiving more than 5 responses. + Future runFibonacciCancel() async { + final call = stub.fibonacci(new Empty()); + int count = 0; + await for (var number in call) { + count++; + print('Received ${number.value} (count=$count)'); + if (count > 5) { + await call.cancel(); + } + } + print('Final count: $count'); + } + + // Run the timeout demo. ... + Future runFibonacciTimeout() async { + // TODO(jakobr): Implement timeouts. + } +} diff --git a/example/metadata/lib/src/generated/metadata.pb.dart b/example/metadata/lib/src/generated/metadata.pb.dart new file mode 100644 index 0000000..960b012 --- /dev/null +++ b/example/metadata/lib/src/generated/metadata.pb.dart @@ -0,0 +1,109 @@ +/// +// Generated code. Do not modify. +/// +// ignore_for_file: non_constant_identifier_names +// ignore_for_file: library_prefixes +library grpc_metadata; + +// ignore: UNUSED_SHOWN_NAME +import 'dart:core' show int, bool, double, String, List, override; + +import 'package:protobuf/protobuf.dart'; + +class Record extends GeneratedMessage { + static final BuilderInfo _i = new BuilderInfo('Record') + ..a/**/(1, 'value', PbFieldType.OS) + ..hasRequiredFields = false; + + Record() : super(); + Record.fromBuffer(List i, + [ExtensionRegistry r = ExtensionRegistry.EMPTY]) + : super.fromBuffer(i, r); + Record.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) + : super.fromJson(i, r); + Record clone() => new Record()..mergeFromMessage(this); + BuilderInfo get info_ => _i; + static Record create() => new Record(); + static PbList createRepeated() => new PbList(); + static Record getDefault() { + if (_defaultInstance == null) _defaultInstance = new _ReadonlyRecord(); + return _defaultInstance; + } + + static Record _defaultInstance; + static void $checkItem(Record v) { + if (v is! Record) checkItemFailed(v, 'Record'); + } + + String get value => $_get(0, 1, ''); + set value(String v) { + $_setString(0, 1, v); + } + + bool hasValue() => $_has(0, 1); + void clearValue() => clearField(1); +} + +class _ReadonlyRecord extends Record with ReadonlyMessageMixin {} + +class Number extends GeneratedMessage { + static final BuilderInfo _i = new BuilderInfo('Number') + ..a/**/(1, 'value', PbFieldType.O3) + ..hasRequiredFields = false; + + Number() : super(); + Number.fromBuffer(List i, + [ExtensionRegistry r = ExtensionRegistry.EMPTY]) + : super.fromBuffer(i, r); + Number.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) + : super.fromJson(i, r); + Number clone() => new Number()..mergeFromMessage(this); + BuilderInfo get info_ => _i; + static Number create() => new Number(); + static PbList createRepeated() => new PbList(); + static Number getDefault() { + if (_defaultInstance == null) _defaultInstance = new _ReadonlyNumber(); + return _defaultInstance; + } + + static Number _defaultInstance; + static void $checkItem(Number v) { + if (v is! Number) checkItemFailed(v, 'Number'); + } + + int get value => $_get(0, 1, 0); + set value(int v) { + $_setUnsignedInt32(0, 1, v); + } + + bool hasValue() => $_has(0, 1); + void clearValue() => clearField(1); +} + +class _ReadonlyNumber extends Number with ReadonlyMessageMixin {} + +class Empty extends GeneratedMessage { + static final BuilderInfo _i = new BuilderInfo('Empty') + ..hasRequiredFields = false; + + Empty() : super(); + Empty.fromBuffer(List i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) + : super.fromBuffer(i, r); + Empty.fromJson(String i, [ExtensionRegistry r = ExtensionRegistry.EMPTY]) + : super.fromJson(i, r); + Empty clone() => new Empty()..mergeFromMessage(this); + BuilderInfo get info_ => _i; + static Empty create() => new Empty(); + static PbList createRepeated() => new PbList(); + static Empty getDefault() { + if (_defaultInstance == null) _defaultInstance = new _ReadonlyEmpty(); + return _defaultInstance; + } + + static Empty _defaultInstance; + static void $checkItem(Empty v) { + if (v is! Empty) checkItemFailed(v, 'Empty'); + } +} + +class _ReadonlyEmpty extends Empty with ReadonlyMessageMixin {} diff --git a/example/metadata/lib/src/generated/metadata.pbgrpc.dart b/example/metadata/lib/src/generated/metadata.pbgrpc.dart new file mode 100644 index 0000000..a2b8795 --- /dev/null +++ b/example/metadata/lib/src/generated/metadata.pbgrpc.dart @@ -0,0 +1,94 @@ +/// +// Generated code. Do not modify. +/// +// ignore_for_file: non_constant_identifier_names +// ignore_for_file: library_prefixes +library grpc_metadata_pbgrpc; + +import 'dart:async'; + +import 'package:grpc/grpc.dart'; + +import 'metadata.pb.dart'; +export 'metadata.pb.dart'; + +class MetadataClient { + final ClientChannel _channel; + + static final _$echo = new ClientMethod( + '/grpc.Metadata/Echo', + (Record value) => value.writeToBuffer(), + (List value) => new Record.fromBuffer(value)); + static final _$addOne = new ClientMethod( + '/grpc.Metadata/AddOne', + (Number value) => value.writeToBuffer(), + (List value) => new Number.fromBuffer(value)); + static final _$fibonacci = new ClientMethod( + '/grpc.Metadata/Fibonacci', + (Empty value) => value.writeToBuffer(), + (List value) => new Number.fromBuffer(value)); + + MetadataClient(this._channel); + + ResponseFuture echo(Record request, {CallOptions options}) { + final call = new ClientCall(_channel, _$echo, options: options); + call.request + ..add(request) + ..close(); + return new ResponseFuture(call); + } + + ResponseStream addOne(Stream request, {CallOptions options}) { + final call = new ClientCall(_channel, _$addOne, options: options); + request.pipe(call.request); + return new ResponseStream(call); + } + + ResponseStream fibonacci(Empty request, {CallOptions options}) { + final call = new ClientCall(_channel, _$fibonacci, options: options); + call.request + ..add(request) + ..close(); + return new ResponseStream(call); + } +} + +abstract class MetadataServiceBase extends Service { + String get $name => 'grpc.Metadata'; + + MetadataServiceBase() { + $addMethod(new ServiceMethod( + 'Echo', + echo_Pre, + false, + false, + (List value) => new Record.fromBuffer(value), + (Record value) => value.writeToBuffer())); + $addMethod(new ServiceMethod( + 'AddOne', + addOne, + true, + true, + (List value) => new Number.fromBuffer(value), + (Number value) => value.writeToBuffer())); + $addMethod(new ServiceMethod( + 'Fibonacci', + fibonacci_Pre, + false, + true, + (List value) => new Empty.fromBuffer(value), + (Number value) => value.writeToBuffer())); + } + + Future echo_Pre(ServiceCall call, Future request) async { + return echo(call, await request); + } + + Stream fibonacci_Pre(ServiceCall call, Future request) async* { + yield* fibonacci(call, await request); + } + + Future echo(ServiceCall call, Record request); + Stream addOne(ServiceCall call, Stream request); + Stream fibonacci(ServiceCall call, Empty request); +} diff --git a/example/metadata/lib/src/server.dart b/example/metadata/lib/src/server.dart new file mode 100644 index 0000000..5f470c9 --- /dev/null +++ b/example/metadata/lib/src/server.dart @@ -0,0 +1,74 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:grpc/grpc.dart' as grpc; + +import 'generated/metadata.pbgrpc.dart'; + +class MetadataService extends MetadataServiceBase { + int callCount = 0; + + @override + Future echo(grpc.ServiceCall call, Record request) async { + final peer = call.clientMetadata['peer']; + final count = callCount++; + print('Echo: Call #$count: Peer: $peer, request: ${request.value}'); + call.headers['count'] = '${count}'; + call.trailers['hello'] = request.value; + + final delay = call.clientMetadata['delay']; + if (delay != null) { + await new Future.delayed(new Duration(seconds: int.parse(delay))); + } + + return new Record()..value = peer; + } + + @override + Stream addOne(grpc.ServiceCall call, Stream request) async* { + int lastNumber = -1; + try { + await for (var number in request) { + lastNumber = number.value; + yield new Number()..value = number.value + 1; + } + } catch (error) { + print('Caught: $error, last number = $lastNumber'); + } finally { + if (call.isCanceled) { + print('AddOne: Call canceled'); + } + } + } + + /// Streams a Fibonacci number every 500ms until the call is canceled. + Stream fibonacci(grpc.ServiceCall call, Empty request) async* { + int previous = 0; + int current = 1; + try { + while (true) { + await new Future.delayed(new Duration(milliseconds: 500)); + yield new Number()..value = current; + final next = current + previous; + previous = current; + current = next; + } + } finally { + if (call.isCanceled) { + print('Fibonacci: Canceled.'); + } + } + } +} + +class Server { + Future main(List args) async { + final server = new grpc.Server(port: 8080) + ..addService(new MetadataService()); + await server.serve(); + print('Server listening...'); + } +} diff --git a/example/metadata/protos/metadata.proto b/example/metadata/protos/metadata.proto new file mode 100644 index 0000000..9b6a63f --- /dev/null +++ b/example/metadata/protos/metadata.proto @@ -0,0 +1,67 @@ +// Copyright 2017, Google Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +syntax = "proto3"; + +package grpc; + +// Interface exported by the server. +service Metadata { + // Echo metadata. + // + // Echoes the given input as trailer metadata. Sets a call counter as header + // metadata, and returns the value of the 'hello' key in the client metadata + // as the result. + rpc Echo(Record) returns (Record) {} + + // Adds 1 to the numbers in the request stream. + // + // Uses bidirectional streaming. + rpc AddOne(stream Number) returns (stream Number) {} + + // Fibonacci. + // + // Streams Fibonacci numbers until the call is canceled or times out. + rpc Fibonacci(Empty) returns (stream Number) {} +} + +// A message containing a single string value. +message Record { + string value = 1; +} + +// A message containing a single number. +message Number { + int32 value = 1; +} + +// A message containing nothing. +message Empty { +} + diff --git a/example/metadata/pubspec.yaml b/example/metadata/pubspec.yaml new file mode 100644 index 0000000..9ea516e --- /dev/null +++ b/example/metadata/pubspec.yaml @@ -0,0 +1,17 @@ +name: metadata +description: Dart gRPC sample client and server. +version: 0.0.1 +homepage: https://github.com/dart-lang/grpc-dart + +environment: + sdk: '>=1.20.1 <2.0.0' + +dependencies: + async: ^1.13.3 + grpc: + path: ../../ + protobuf: ^0.5.4 + http2: ^0.1.2 + +dev_dependencies: + test: ^0.12.0 diff --git a/example/metadata/tool/regenerate.sh b/example/metadata/tool/regenerate.sh new file mode 100755 index 0000000..65e2a27 --- /dev/null +++ b/example/metadata/tool/regenerate.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +protoc --dart_out=grpc:lib/src/generated -Iprotos protos/metadata.proto +rm lib/src/generated/metadata.pb{enum,json}.dart +dartfmt -w lib/src/generated diff --git a/example/route_guide/README.md b/example/route_guide/README.md index 866857d..f49cfa6 100644 --- a/example/route_guide/README.md +++ b/example/route_guide/README.md @@ -2,8 +2,6 @@ The route guide server and client demonstrate how to use Dart gRPC libraries to perform unary, client streaming, server streaming and full duplex RPCs. -Please refer to [gRPC Basics: Dart]() for more information. - See the definition of the route guide service in `protos/route_guide.proto`. # Run the sample code diff --git a/example/route_guide/lib/src/client.dart b/example/route_guide/lib/src/client.dart index 6ae54d7..40a6cd0 100644 --- a/example/route_guide/lib/src/client.dart +++ b/example/route_guide/lib/src/client.dart @@ -15,12 +15,9 @@ class Client { ClientChannel channel; RouteGuideClient stub; - Client() { + Future main(List args) async { channel = new ClientChannel('127.0.0.1', port: 8080); stub = new RouteGuideClient(channel); - } - - Future main(List args) async { // Run all of the demos in order. await runGetFeature(); await runListFeatures(); @@ -128,9 +125,9 @@ class Client { } final call = stub.routeChat(outgoingNotes()); - await call.forEach((note) { + await for (var note in call) { print('Got message ${note.message} at ${note.location.latitude}, ${note .location.longitude}'); - }); + } } } diff --git a/lib/src/client.dart b/lib/src/client.dart index 5afab02..56ebd55 100644 --- a/lib/src/client.dart +++ b/lib/src/client.dart @@ -12,16 +12,44 @@ import 'package:http2/transport.dart'; import 'shared.dart'; import 'streams.dart'; +const _reservedHeaders = const [ + 'content-type', + 'te', + 'grpc-timeout', + 'grpc-accept-encoding', + 'user-agent', +]; + +/// Runtime options for a RPC call. +class CallOptions { + final Map metadata; + final Duration timeout; + + CallOptions._(this.metadata, this.timeout); + + factory CallOptions({Map metadata, Duration timeout}) { + final sanitizedMetadata = {}; + metadata?.forEach((key, value) { + final lowerCaseKey = key.toLowerCase(); + if (!lowerCaseKey.startsWith(':') && + !_reservedHeaders.contains(lowerCaseKey)) { + sanitizedMetadata[lowerCaseKey] = value; + } + }); + return new CallOptions._(new Map.unmodifiable(sanitizedMetadata), timeout); + } +} + /// A channel to an RPC endpoint. class ClientChannel { final String host; final int port; + final CallOptions options; final List _sockets = []; final List _connections = []; - // TODO(jakobr): Channel options. - ClientChannel(this.host, {this.port = 8080}); + ClientChannel(this.host, {this.port = 8080, this.options}); /// Returns a connection to this [Channel]'s RPC endpoint. The connection may /// be shared between multiple RPC calls. @@ -76,32 +104,65 @@ class ClientCall implements Response { StreamController _responses; StreamSubscription _responseSubscription; - final Map metadata; + final CallOptions options; - ClientCall(this._channel, this._method, {this.metadata = const {}}) { + Future _callSetup; + + ClientCall(this._channel, this._method, {this.options}) { _responses = new StreamController(onListen: _onResponseListen); - _call().catchError((error) { + _callSetup = _initiateCall().catchError((error) { _responses.addError( new GrpcError(1703, 'Error connecting: ${error.toString()}')); }); } - Future _call() async { + /// Convert [timeout] to grpc-timeout header string format. + // Mostly inspired by grpc-java implementation. + // TODO(jakobr): Modify to match grpc/core implementation instead. + static String toTimeoutString(Duration duration) { + const cutoff = 100000; + final timeout = duration.inMicroseconds; + if (timeout < 0) { + // Smallest possible timeout. + return '1n'; + } else if (timeout < cutoff) { + return '${timeout}u'; + } else if (timeout < cutoff * 1000) { + return '${timeout~/1000}m'; + } else if (timeout < cutoff * 1000 * 1000) { + return '${timeout~/1000000}S'; + } else if (timeout < cutoff * 1000 * 1000 * 60) { + return '${timeout~/60000000}M'; + } else { + return '${timeout~/3600000000}H'; + } + } + + Future _initiateCall() async { final connection = await _channel.connect(); + final timeout = options?.timeout ?? _channel.options?.timeout; // TODO(jakobr): Populate HTTP-specific headers in connection? final headers =
[ _methodPost, _schemeHttp, new Header.ascii(':path', _method.path), new Header.ascii(':authority', _channel.host), - new Header.ascii('grpc-timeout', '5S'), + ]; + if (timeout != null) { + headers.add(new Header.ascii('grpc-timeout', toTimeoutString(timeout))); + } + headers.addAll([ _contentTypeGrpc, _teTrailers, _grpcAcceptEncoding, _userAgent, - ]; - metadata.forEach((key, value) { - // TODO(jakobr): Filter out headers owned by gRPC. + ]); + // TODO(jakobr): Flip this around, and have the Channel create the call + // object and apply options (including the above TODO). + final customMetadata = {}; + customMetadata.addAll(_channel.options?.metadata ?? {}); + customMetadata.addAll(options?.metadata ?? {}); + customMetadata.forEach((key, value) { headers.add(new Header.ascii(key, value)); }); _stream = connection.makeRequest(headers); @@ -250,11 +311,16 @@ class ClientCall implements Response { @override Future cancel() async { - _stream.terminate(); - final futures = [ - _requests.close(), - _responses.close(), - ]; + _callSetup.whenComplete(() { + // Terminate the stream if the call connects after being canceled. + _stream?.terminate(); + }); + // Don't await _responses.close() here. It'll only complete once the done + // event has been delivered, and it's the caller of this function that is + // reading from responses as well, so we might end up deadlocked. + _responses.close(); + _stream?.terminate(); + final futures = [_requests.close()]; if (_responseSubscription != null) { futures.add(_responseSubscription.cancel()); } diff --git a/lib/src/server.dart b/lib/src/server.dart index 0c222d8..8fcfdb6 100644 --- a/lib/src/server.dart +++ b/lib/src/server.dart @@ -95,32 +95,36 @@ class Server { /// /// Gives the method handler access to custom metadata from the client, and /// ability to set custom metadata on the header/trailer sent to the client. -abstract class ServiceCall { +class ServiceCall { + final ServerHandler _handler; + + ServiceCall(this._handler); + /// Custom metadata from the client. - Map get clientMetadata; + Map get clientMetadata => _handler._clientMetadata; /// Custom metadata to be sent to the client. Will be [null] once the headers /// have been sent, either when [sendHeaders] is called, or when the first /// response message is sent. - Map get headers; + Map get headers => _handler._customHeaders; /// Custom metadata to be sent to the client after all response messages. - Map get trailers; + Map get trailers => _handler._customTrailers; /// Deadline for this call. If the call is still active after this time, then /// the client or server may cancel it. - DateTime get deadline; + DateTime get deadline => _handler._deadline; /// Returns [true] if the [deadline] has been exceeded. - bool get isTimedOut; + bool get isTimedOut => _handler._isTimedOut; /// Returns [true] if the client has canceled this call. - bool get isCanceled; + bool get isCanceled => _handler._isCanceled; /// Send response headers. This is done automatically before sending the first /// response message, but can be done manually before the first response is /// ready, if necessary. - void sendHeaders(); + void sendHeaders() => _handler._sendHeaders(); /// Send response trailers. A trailer indicating success ([status] == 0) will /// be sent automatically when all responses are sent. This method can be used @@ -128,7 +132,9 @@ abstract class ServiceCall { /// /// The call will be closed after calling this method, and no further /// responses can be sent. - void sendTrailer(int status, [String statusMessage]); + /// responses can be sent. + void sendTrailer(int status, [String statusMessage]) => + _handler._sendTrailers(status: status, message: statusMessage); } /// Handles an incoming gRPC call. @@ -144,12 +150,27 @@ class ServerHandler { StreamController _requests; bool _hasReceivedRequest = false; + Stream _responses; StreamSubscription _responseSubscription; bool _headersSent = false; + Map _customHeaders = {}; + Map _customTrailers = {}; + + DateTime _deadline; + bool _isCanceled = false; + ServerHandler(this._methodLookup, this._stream); + bool get isCanceled => _isCanceled; + bool get _isTimedOut => _deadline?.isBefore(new DateTime.now()) ?? false; + void handle() { + _stream.onTerminated = (int errorCode) { + _isCanceled = true; + _responseSubscription?.cancel(); + }; + _incomingSubscription = _stream.incomingMessages .transform(new GrpcHttpDecoder()) .transform(grpcDecompressor()) @@ -176,7 +197,7 @@ class ServerHandler { final method = path[2]; _descriptor = _methodLookup(service, method); if (_descriptor == null) { - _sendError(404, 'Method not found'); + _sendError(404, 'Path /$service/$method not found'); return; } _startStreamingRequest(); @@ -190,23 +211,23 @@ class ServerHandler { onResume: _incomingSubscription.resume); _incomingSubscription.onData(_onDataActive); - Stream responses; + final context = new ServiceCall(this); if (_descriptor.streamingResponse) { if (_descriptor.streamingRequest) { - responses = _descriptor.handler(null, _requests.stream); + _responses = _descriptor.handler(context, _requests.stream); } else { - responses = _descriptor.handler(null, _requests.stream.single); + _responses = _descriptor.handler(context, _requests.stream.single); } } else { Future response; if (_descriptor.streamingRequest) { - response = _descriptor.handler(null, _requests.stream); + response = _descriptor.handler(context, _requests.stream); } else { - response = _descriptor.handler(null, _requests.stream.single); + response = _descriptor.handler(context, _requests.stream.single); } - responses = response.asStream(); + _responses = response.asStream(); } - _responseSubscription = responses.listen(_onResponse, + _responseSubscription = _responses.listen(_onResponse, onError: _onResponseError, onDone: _onResponseDone, cancelOnError: true); @@ -251,9 +272,23 @@ class ServerHandler { // -- Active state, outgoing response data -- void _onResponse(response) { - _ensureHeadersSent(); - final bytes = _descriptor.responseSerializer(response); - _stream.sendData(GrpcHttpEncoder.frame(bytes)); + try { + if (!_headersSent) { + _sendHeaders(); + } + final bytes = _descriptor.responseSerializer(response); + _stream.sendData(GrpcHttpEncoder.frame(bytes)); + } catch (error) { + _responseSubscription.cancel(); + if (!_requests.isClosed) { + // If we can, alert the handler that things are going wrong. + _requests + .addError(new GrpcError(1001, 'Error sending response: $error')); + _requests.close(); + } + _incomingSubscription.cancel(); + _stream.terminate(); + } } void _onResponseDone() { @@ -269,36 +304,44 @@ class ServerHandler { } } - void _ensureHeadersSent() { - if (_headersSent) return; - _sendHeaders(); - } - void _sendHeaders() { if (_headersSent) throw new GrpcError(1514, 'Headers already sent'); - final headers = [ - new Header.ascii(':status', - 200.toString()), // TODO(jakobr): Should really be on package:http2. - new Header.ascii('content-type', 'application/grpc'), - ]; - // headers.addAll(context.headers); + final headersMap = {}; + headersMap.addAll(_customHeaders); + _customHeaders = null; + + // TODO(jakobr): Should come from package:http2? + headersMap[':status'] = '200'; + headersMap['content-type'] = 'application/grpc'; + + final headers =
[]; + headersMap + .forEach((key, value) => headers.add(new Header.ascii(key, value))); _stream.sendHeaders(headers); _headersSent = true; } void _sendTrailers({int status = 0, String message}) { - final trailers =
[]; + final trailersMap = {}; if (!_headersSent) { - trailers.addAll([ - new Header.ascii(':status', 200.toString()), - new Header.ascii('content-type', 'application/grpc'), - ]); + trailersMap.addAll(_customHeaders); + _customHeaders = null; } - trailers.add(new Header.ascii('grpc-status', status.toString())); + trailersMap.addAll(_customTrailers); + _customTrailers = null; + if (!_headersSent) { + // TODO(jakobr): Should come from package:http2? + trailersMap[':status'] = '200'; + trailersMap['content-type'] = 'application/grpc'; + } + trailersMap['grpc-status'] = status.toString(); if (message != null) { - trailers.add(new Header.ascii('grpc-message', message)); + trailersMap['grpc-message'] = message; } - // trailers.addAll(context.trailers); + + final trailers =
[]; + trailersMap + .forEach((key, value) => trailers.add(new Header.ascii(key, value))); _stream.sendHeaders(trailers, endStream: true); // We're done! _incomingSubscription.cancel(); @@ -308,9 +351,11 @@ class ServerHandler { // -- All states, incoming error / stream closed -- void _onError(error) { - print('Stream error: $error'); - // TODO(jakobr): Handle. Might be a cancel request from the client, which - // should be propagated. + // Exception from the incoming stream. Most likely a cancel request from the + // client, so we treat it as such. + _isCanceled = true; + _requests.addError(new GrpcError(1001, 'Canceled')); + _responseSubscription?.cancel(); } void _onDoneError() { @@ -327,7 +372,6 @@ class ServerHandler { } void _sendError(int status, String message) { - print('Sending error $status: $message'); _sendTrailers(status: status, message: message); } } diff --git a/test/timeout_test.dart b/test/timeout_test.dart new file mode 100644 index 0000000..0be23e1 --- /dev/null +++ b/test/timeout_test.dart @@ -0,0 +1,25 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:test/test.dart'; + +import 'package:grpc/src/client.dart'; + +void main() { + test('Timeouts are converted correctly', () { + expect(ClientCall.toTimeoutString(new Duration(microseconds: -1)), '1n'); + expect(ClientCall.toTimeoutString(new Duration(microseconds: 0)), '0u'); + expect(ClientCall.toTimeoutString(new Duration(microseconds: 107)), '107u'); + expect(ClientCall.toTimeoutString(new Duration(hours: 2, microseconds: 17)), + '7200S'); + expect(ClientCall.toTimeoutString(new Duration(milliseconds: 1420665)), + '1420S'); + expect( + ClientCall.toTimeoutString(new Duration(seconds: 2, microseconds: 3)), + '2000m'); + expect( + ClientCall.toTimeoutString(new Duration(seconds: 2, milliseconds: 3)), + '2003m'); + }); +}