diff --git a/example/googleapis/pubspec.yaml b/example/googleapis/pubspec.yaml index b62fef2..f40d296 100644 --- a/example/googleapis/pubspec.yaml +++ b/example/googleapis/pubspec.yaml @@ -7,7 +7,7 @@ environment: sdk: '>=1.24.3 <2.0.0' dependencies: - async: ^1.13.3 + async: '>=1.13.3 <3.0.0' grpc: path: ../../ protobuf: ^0.7.0 diff --git a/example/helloworld/pubspec.yaml b/example/helloworld/pubspec.yaml index e8e2044..764e574 100644 --- a/example/helloworld/pubspec.yaml +++ b/example/helloworld/pubspec.yaml @@ -7,7 +7,7 @@ environment: sdk: '>=1.24.3 <2.0.0' dependencies: - async: ^1.13.3 + async: '>=1.13.3 <3.0.0' grpc: path: ../../ protobuf: ^0.7.0 diff --git a/example/metadata/pubspec.yaml b/example/metadata/pubspec.yaml index be2e439..ad2ee4d 100644 --- a/example/metadata/pubspec.yaml +++ b/example/metadata/pubspec.yaml @@ -7,7 +7,7 @@ environment: sdk: '>=1.24.3 <2.0.0' dependencies: - async: ^1.13.3 + async: '>=1.13.3 <3.0.0' grpc: path: ../../ protobuf: ^0.7.0 diff --git a/example/route_guide/lib/src/common.dart b/example/route_guide/lib/src/common.dart index c9a0437..f8e5500 100644 --- a/example/route_guide/lib/src/common.dart +++ b/example/route_guide/lib/src/common.dart @@ -24,7 +24,7 @@ final List featuresDb = _readDatabase(); List _readDatabase() { final dbData = new File('data/route_guide_db.json').readAsStringSync(); - final List> db = JSON.decode(dbData); + final List db = JSON.decode(dbData); return db.map((entry) { final location = new Point() ..latitude = entry['location']['latitude'] diff --git a/example/route_guide/pubspec.yaml b/example/route_guide/pubspec.yaml index da8edc7..dc661a5 100644 --- a/example/route_guide/pubspec.yaml +++ b/example/route_guide/pubspec.yaml @@ -7,7 +7,7 @@ environment: sdk: '>=1.24.3 <2.0.0' dependencies: - async: ^1.13.3 + async: '>=1.13.3 <3.0.0' grpc: path: ../../ protobuf: ^0.7.0 diff --git a/interop/bin/server.dart b/interop/bin/server.dart index f26f6be..b722f77 100644 --- a/interop/bin/server.dart +++ b/interop/bin/server.dart @@ -136,6 +136,7 @@ Future main(List args) async { tlsCredentials = new ServerTlsCredentials( certificate: await certificate, privateKey: await privateKey); } - await server.serve(port: port, security: tlsCredentials); + await server.serve( + address: 'localhost', port: port, security: tlsCredentials); print('Server listening on port ${server.port}...'); } diff --git a/interop/pubspec.yaml b/interop/pubspec.yaml index bf2bc2a..37b3472 100644 --- a/interop/pubspec.yaml +++ b/interop/pubspec.yaml @@ -8,7 +8,7 @@ environment: dependencies: args: ^0.13.0 - async: ^1.13.3 + async: '>=1.13.3 <3.0.0' collection: ^1.14.2 grpc: path: ../ diff --git a/lib/src/auth/auth.dart b/lib/src/auth/auth.dart index fa53ac5..bfb4a34 100644 --- a/lib/src/auth/auth.dart +++ b/lib/src/auth/auth.dart @@ -29,7 +29,7 @@ abstract class BaseAuthenticator { auth.AccessToken _accessToken; String _lastUri; - Future authenticate(Map metadata, String uri) async { + Future authenticate(Map metadata, String uri) async { if (uri == null) { throw new GrpcError.unauthenticated( 'Credentials require secure transport.'); @@ -54,13 +54,13 @@ abstract class BaseAuthenticator { CallOptions get toCallOptions => new CallOptions(providers: [authenticate]); - Future obtainAccessCredentials(String uri); + Future obtainAccessCredentials(String uri); } abstract class HttpBasedAuthenticator extends BaseAuthenticator { - Future _call; + Future _call; - Future obtainAccessCredentials(String uri) { + Future obtainAccessCredentials(String uri) { if (_call == null) { final authClient = new http.Client(); _call = obtainCredentialsWithClient(authClient, uri).then((credentials) { @@ -72,11 +72,13 @@ abstract class HttpBasedAuthenticator extends BaseAuthenticator { return _call; } - Future obtainCredentialsWithClient(http.Client client, String uri); + Future obtainCredentialsWithClient( + http.Client client, String uri); } class ComputeEngineAuthenticator extends HttpBasedAuthenticator { - Future obtainCredentialsWithClient(http.Client client, String uri) => + Future obtainCredentialsWithClient( + http.Client client, String uri) => auth.obtainAccessCredentialsViaMetadataServer(client); } @@ -94,7 +96,8 @@ class ServiceAccountAuthenticator extends HttpBasedAuthenticator { String get projectId => _projectId; - Future obtainCredentialsWithClient(http.Client client, String uri) => + Future obtainCredentialsWithClient( + http.Client client, String uri) => auth.obtainAccessCredentialsViaServiceAccount( _serviceAccountCredentials, _scopes, client); } @@ -114,7 +117,7 @@ class JwtServiceAccountAuthenticator extends BaseAuthenticator { String get projectId => _projectId; - Future obtainAccessCredentials(String uri) async { + Future obtainAccessCredentials(String uri) async { _accessToken = _jwtTokenFor(_serviceAccountCredentials, _keyId, uri); } } diff --git a/lib/src/client/call.dart b/lib/src/client/call.dart index 8832734..f60132c 100644 --- a/lib/src/client/call.dart +++ b/lib/src/client/call.dart @@ -91,7 +91,7 @@ class ClientCall implements Response { if (options.metadataProviders.isEmpty) { _sendRequest(connection, _sanitizeMetadata(options.metadata)); } else { - final metadata = new Map.from(options.metadata); + final metadata = new Map.from(options.metadata); String audience; if (connection.options.credentials.isSecure) { final port = connection.port != 443 ? ':${connection.port}' : ''; diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 86fb0e6..ef813b9 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -113,53 +113,13 @@ class ServerHandler extends ServiceCall { _startStreamingRequest(); } - Future _toSingleFuture(Stream stream) { - T _ensureOnlyOneRequest(T previous, T element) { - if (previous != null) { - throw new GrpcError.unimplemented('More than one request received'); - } - return element; - } - - T _ensureOneRequest(T value) { - if (value == null) - throw new GrpcError.unimplemented('No requests received'); - return value; - } - - final future = - stream.fold(null, _ensureOnlyOneRequest).then(_ensureOneRequest); - // Make sure errors on the future aren't unhandled, but return the original - // future so the request handler can also get the error. - future.catchError((_) {}); - return future; - } - void _startStreamingRequest() { _incomingSubscription.pause(); - _requests = new StreamController( - onListen: _incomingSubscription.resume, - onPause: _incomingSubscription.pause, - onResume: _incomingSubscription.resume); + _requests = _descriptor.createRequestStream(_incomingSubscription); _incomingSubscription.onData(_onDataActive); _service.$onMetadata(this); - if (_descriptor.streamingResponse) { - if (_descriptor.streamingRequest) { - _responses = _descriptor.handler(this, _requests.stream); - } else { - _responses = - _descriptor.handler(this, _toSingleFuture(_requests.stream)); - } - } else { - Future response; - if (_descriptor.streamingRequest) { - response = _descriptor.handler(this, _requests.stream); - } else { - response = _descriptor.handler(this, _toSingleFuture(_requests.stream)); - } - _responses = response.asStream(); - } + _responses = _descriptor.handle(this, _requests.stream); _responseSubscription = _responses.listen(_onResponse, onError: _onResponseError, @@ -213,7 +173,7 @@ class ServerHandler extends ServiceCall { final data = message as GrpcData; var request; try { - request = _descriptor.requestDeserializer(data.data); + request = _descriptor.deserialize(data.data); } catch (error) { final grpcError = new GrpcError.internal('Error deserializing request: $error'); @@ -231,7 +191,7 @@ class ServerHandler extends ServiceCall { void _onResponse(response) { try { - final bytes = _descriptor.responseSerializer(response); + final bytes = _descriptor.serialize(response); if (!_headersSent) { sendHeaders(); } diff --git a/lib/src/server/service.dart b/lib/src/server/service.dart index 5630686..26d869c 100644 --- a/lib/src/server/service.dart +++ b/lib/src/server/service.dart @@ -13,6 +13,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +import 'dart:async'; + +import '../shared/status.dart'; import 'call.dart'; /// Definition of a gRPC service method. @@ -34,6 +37,56 @@ class ServiceMethod { this.streamingResponse, this.requestDeserializer, this.responseSerializer); + + StreamController createRequestStream(StreamSubscription incoming) => + new StreamController( + onListen: incoming.resume, + onPause: incoming.pause, + onResume: incoming.resume); + + Q deserialize(List data) => requestDeserializer(data); + + List serialize(dynamic response) => responseSerializer(response as R); + + Stream handle(ServiceCall call, Stream requests) { + if (streamingResponse) { + if (streamingRequest) { + return handler(call, requests); + } else { + return handler(call, _toSingleFuture(requests)); + } + } else { + Future response; + if (streamingRequest) { + response = handler(call, requests); + } else { + response = handler(call, _toSingleFuture(requests)); + } + return response.asStream(); + } + } + + Future _toSingleFuture(Stream stream) { + Q _ensureOnlyOneRequest(Q previous, Q element) { + if (previous != null) { + throw new GrpcError.unimplemented('More than one request received'); + } + return element; + } + + Q _ensureOneRequest(Q value) { + if (value == null) + throw new GrpcError.unimplemented('No requests received'); + return value; + } + + final future = + stream.fold(null, _ensureOnlyOneRequest).then(_ensureOneRequest); + // Make sure errors on the future aren't unhandled, but return the original + // future so the request handler can also get the error. + future.catchError((_) {}); + return future; + } } /// Definition of a gRPC service.