diff --git a/.travis.yml b/.travis.yml index 24dde5c..cdee130 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,15 @@ language: dart -sudo: false -# Run against both the dev and channel. +# necessary to avoid chrome sandboxing issues +sudo: required +addons: + chrome: stable + +# The Chrome addon does not work on windows +install: + - if [ $TRAVIS_OS_NAME = windows ]; then choco install googlechrome ; fi + +# Run against both the dev and stable channel. dart: - stable - dev @@ -9,6 +17,7 @@ dart: # Define test tasks to run. dart_task: - test: --platform vm + - test: --platform chrome # Only run one instance of the formatter and the analyzer, rather than running # them against each Dart version. @@ -19,8 +28,9 @@ matrix: - dart: dev dart_task: dartfmt script: + - pub get - dartanalyzer lib test - - for example in example/*; do (cd $example; echo [Analyzing $example]; pub get; dartanalyzer .); done + - for example in example/*/; do (cd $example; echo [Analyzing $example]; pub get; dartanalyzer .); done - (cd interop; echo [Analyzing interop]; pub get; dartanalyzer .) diff --git a/CHANGELOG.md b/CHANGELOG.md index 361870f..ac30de2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 2.0.0 + +* Add initial support for grpc-web. + See `example/grpc-web` for an example of this working. +* **Breaking**: `grpc.dart` no longer exposes `ClientConnection`. It was supposed to be an internal + abstraction. +* **Breaking**: `grpc.dart` no longer exposes the deprecated `ServerHandler`. + It was supposed to be an internal abstraction. +* `service_api.dart` no longer exports Server - it has never been used by the generated code. + ## 1.0.3 * Allow custom user agent with a `userAgent` argument for `ChannelOptions()`. diff --git a/build.yaml b/build.yaml new file mode 100644 index 0000000..2a9825b --- /dev/null +++ b/build.yaml @@ -0,0 +1,5 @@ +targets: + $default: + sources: + exclude: + - example/** diff --git a/example/grpc-web/README.md b/example/grpc-web/README.md new file mode 100644 index 0000000..db9d82d --- /dev/null +++ b/example/grpc-web/README.md @@ -0,0 +1,62 @@ +# Description +The grpc-web example shows how to use the Dart gRPC library with a gRPC-Web capable server. + +This is meant to be used with the echo example provided by the grpc-web repository. The definition of the service is given in echo.proto. + +# Prerequistes +You will need a clone of the [grpc-web](https://github.com/grpc/grpc-web) repository to run the example server. + +You will also need the dart 'webdev' tool, which you can get by running: + +```sh +$ pub global activate webdev +``` + +# Run the sample code +Follow the instructions for starting the grpc-web example server. The simplest version of this involves running the grpc-web server in a docker container with: + +```sh +$ docker-compose up echo-server envoy +``` + +To compile and run the example, assuming you are in the root of the grpc-web +folder, i.e., .../example/grpc-web/, first get the dependencies by running: + +```sh +$ pub get +``` + +Compile and run the website with: + +```sh +$ webdev serve web:9000 +``` + +Note that the alternate port (9000) is necessary because the grpc-web server runs the grpc server on port 8080 by default (the save as webdev). + +You can then navigate to http://localhost:9000/ to try out the example. + +# Regenerate the stubs + +If you have made changes to the message or service definition in +`protos/echo.proto` and need to regenerate the corresponding Dart files, +you will need to have protoc version 3.0.0 or higher and the Dart protoc plugin +version 16.0.0 or higher on your PATH. + +To install protoc, see the instructions on +[the Protocol Buffers website](https://developers.google.com/protocol-buffers/). + +The easiest way to get the Dart protoc plugin is by running + +```sh +$ pub global activate protoc_plugin +``` + +and follow the directions to add `~/.pub-cache/bin` to your PATH, if you haven't +already done so. + +You can now regenerate the Dart files by running + +```sh +$ protoc --dart_out=grpc:lib/src/generated -Iprotos protos/echo.proto +``` diff --git a/example/grpc-web/lib/app.dart b/example/grpc-web/lib/app.dart new file mode 100644 index 0000000..48dc856 --- /dev/null +++ b/example/grpc-web/lib/app.dart @@ -0,0 +1,53 @@ +import 'dart:async'; +import 'dart:html'; + +import 'package:grpc_web/src/generated/echo.pbgrpc.dart'; + +class EchoApp { + final EchoServiceClient _service; + + EchoApp(this._service); + + Future echo(String message) async { + _addLeftMessage(message); + + try { + final response = await _service.echo(EchoRequest()..message = message); + _addRightMessage(response.message); + } catch (error) { + _addRightMessage(error.toString()); + } + } + + void repeatEcho(String message, int count) { + _addLeftMessage(message); + final request = ServerStreamingEchoRequest() + ..message = message + ..messageCount = count + ..messageInterval = 500; + _service.serverStreamingEcho(request).listen((response) { + _addRightMessage(response.message); + }, onError: (error) { + _addRightMessage(error.toString()); + }, onDone: () => print('Closed connection to server.')); + } + + void _addLeftMessage(String message) { + _addMessage(message, "label-primary pull-left"); + } + + void _addRightMessage(String message) { + _addMessage(message, "label-default pull-right"); + } + + void _addMessage(String message, String cssClass) { + final classes = cssClass.split(' '); + querySelector('#first').after(DivElement() + ..classes.add('row') + ..append(Element.tag('h2') + ..append(SpanElement() + ..classes.add('label') + ..classes.addAll(classes) + ..text = message))); + } +} diff --git a/example/grpc-web/lib/src/generated/echo.pb.dart b/example/grpc-web/lib/src/generated/echo.pb.dart new file mode 100644 index 0000000..fc59f0d --- /dev/null +++ b/example/grpc-web/lib/src/generated/echo.pb.dart @@ -0,0 +1,183 @@ +/// +// Generated code. Do not modify. +// source: echo.proto +/// +// ignore_for_file: non_constant_identifier_names,library_prefixes,unused_import + +// ignore: UNUSED_SHOWN_NAME +import 'dart:core' show int, bool, double, String, List, Map, override; + +import 'package:protobuf/protobuf.dart' as $pb; + +class EchoRequest extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo('EchoRequest', + package: const $pb.PackageName('grpc.gateway.testing')) + ..aOS(1, 'message') + ..hasRequiredFields = false; + + EchoRequest() : super(); + EchoRequest.fromBuffer(List i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) + : super.fromBuffer(i, r); + EchoRequest.fromJson(String i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) + : super.fromJson(i, r); + EchoRequest clone() => EchoRequest()..mergeFromMessage(this); + EchoRequest copyWith(void Function(EchoRequest) updates) => + super.copyWith((message) => updates(message as EchoRequest)); + $pb.BuilderInfo get info_ => _i; + static EchoRequest create() => EchoRequest(); + EchoRequest createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + static EchoRequest getDefault() => _defaultInstance ??= create()..freeze(); + static EchoRequest _defaultInstance; + static void $checkItem(EchoRequest v) { + if (v is! EchoRequest) $pb.checkItemFailed(v, _i.qualifiedMessageName); + } + + String get message => $_getS(0, ''); + set message(String v) { + $_setString(0, v); + } + + bool hasMessage() => $_has(0); + void clearMessage() => clearField(1); +} + +class EchoResponse extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo('EchoResponse', + package: const $pb.PackageName('grpc.gateway.testing')) + ..aOS(1, 'message') + ..hasRequiredFields = false; + + EchoResponse() : super(); + EchoResponse.fromBuffer(List i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) + : super.fromBuffer(i, r); + EchoResponse.fromJson(String i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) + : super.fromJson(i, r); + EchoResponse clone() => EchoResponse()..mergeFromMessage(this); + EchoResponse copyWith(void Function(EchoResponse) updates) => + super.copyWith((message) => updates(message as EchoResponse)); + $pb.BuilderInfo get info_ => _i; + static EchoResponse create() => EchoResponse(); + EchoResponse createEmptyInstance() => create(); + static $pb.PbList createRepeated() => + $pb.PbList(); + static EchoResponse getDefault() => _defaultInstance ??= create()..freeze(); + static EchoResponse _defaultInstance; + static void $checkItem(EchoResponse v) { + if (v is! EchoResponse) $pb.checkItemFailed(v, _i.qualifiedMessageName); + } + + String get message => $_getS(0, ''); + set message(String v) { + $_setString(0, v); + } + + bool hasMessage() => $_has(0); + void clearMessage() => clearField(1); +} + +class ServerStreamingEchoRequest extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo( + 'ServerStreamingEchoRequest', + package: const $pb.PackageName('grpc.gateway.testing')) + ..aOS(1, 'message') + ..a(2, 'messageCount', $pb.PbFieldType.O3) + ..a(3, 'messageInterval', $pb.PbFieldType.O3) + ..hasRequiredFields = false; + + ServerStreamingEchoRequest() : super(); + ServerStreamingEchoRequest.fromBuffer(List i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) + : super.fromBuffer(i, r); + ServerStreamingEchoRequest.fromJson(String i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) + : super.fromJson(i, r); + ServerStreamingEchoRequest clone() => + ServerStreamingEchoRequest()..mergeFromMessage(this); + ServerStreamingEchoRequest copyWith( + void Function(ServerStreamingEchoRequest) updates) => + super.copyWith( + (message) => updates(message as ServerStreamingEchoRequest)); + $pb.BuilderInfo get info_ => _i; + static ServerStreamingEchoRequest create() => ServerStreamingEchoRequest(); + ServerStreamingEchoRequest createEmptyInstance() => create(); + static $pb.PbList createRepeated() => + $pb.PbList(); + static ServerStreamingEchoRequest getDefault() => + _defaultInstance ??= create()..freeze(); + static ServerStreamingEchoRequest _defaultInstance; + static void $checkItem(ServerStreamingEchoRequest v) { + if (v is! ServerStreamingEchoRequest) + $pb.checkItemFailed(v, _i.qualifiedMessageName); + } + + String get message => $_getS(0, ''); + set message(String v) { + $_setString(0, v); + } + + bool hasMessage() => $_has(0); + void clearMessage() => clearField(1); + + int get messageCount => $_get(1, 0); + set messageCount(int v) { + $_setSignedInt32(1, v); + } + + bool hasMessageCount() => $_has(1); + void clearMessageCount() => clearField(2); + + int get messageInterval => $_get(2, 0); + set messageInterval(int v) { + $_setSignedInt32(2, v); + } + + bool hasMessageInterval() => $_has(2); + void clearMessageInterval() => clearField(3); +} + +class ServerStreamingEchoResponse extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo( + 'ServerStreamingEchoResponse', + package: const $pb.PackageName('grpc.gateway.testing')) + ..aOS(1, 'message') + ..hasRequiredFields = false; + + ServerStreamingEchoResponse() : super(); + ServerStreamingEchoResponse.fromBuffer(List i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) + : super.fromBuffer(i, r); + ServerStreamingEchoResponse.fromJson(String i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) + : super.fromJson(i, r); + ServerStreamingEchoResponse clone() => + ServerStreamingEchoResponse()..mergeFromMessage(this); + ServerStreamingEchoResponse copyWith( + void Function(ServerStreamingEchoResponse) updates) => + super.copyWith( + (message) => updates(message as ServerStreamingEchoResponse)); + $pb.BuilderInfo get info_ => _i; + static ServerStreamingEchoResponse create() => ServerStreamingEchoResponse(); + ServerStreamingEchoResponse createEmptyInstance() => create(); + static $pb.PbList createRepeated() => + $pb.PbList(); + static ServerStreamingEchoResponse getDefault() => + _defaultInstance ??= create()..freeze(); + static ServerStreamingEchoResponse _defaultInstance; + static void $checkItem(ServerStreamingEchoResponse v) { + if (v is! ServerStreamingEchoResponse) + $pb.checkItemFailed(v, _i.qualifiedMessageName); + } + + String get message => $_getS(0, ''); + set message(String v) { + $_setString(0, v); + } + + bool hasMessage() => $_has(0); + void clearMessage() => clearField(1); +} diff --git a/example/grpc-web/lib/src/generated/echo.pbenum.dart b/example/grpc-web/lib/src/generated/echo.pbenum.dart new file mode 100644 index 0000000..8da8be7 --- /dev/null +++ b/example/grpc-web/lib/src/generated/echo.pbenum.dart @@ -0,0 +1,5 @@ +/// +// Generated code. Do not modify. +// source: echo.proto +/// +// ignore_for_file: non_constant_identifier_names,library_prefixes,unused_import diff --git a/example/grpc-web/lib/src/generated/echo.pbgrpc.dart b/example/grpc-web/lib/src/generated/echo.pbgrpc.dart new file mode 100644 index 0000000..a5327c6 --- /dev/null +++ b/example/grpc-web/lib/src/generated/echo.pbgrpc.dart @@ -0,0 +1,79 @@ +/// +// Generated code. Do not modify. +// source: echo.proto +/// +// ignore_for_file: non_constant_identifier_names,library_prefixes,unused_import + +import 'dart:async' as $async; + +import 'package:grpc/service_api.dart' as $grpc; +import 'echo.pb.dart'; +export 'echo.pb.dart'; + +class EchoServiceClient extends $grpc.Client { + static final _$echo = $grpc.ClientMethod( + '/grpc.gateway.testing.EchoService/Echo', + (EchoRequest value) => value.writeToBuffer(), + (List value) => EchoResponse.fromBuffer(value)); + static final _$serverStreamingEcho = $grpc.ClientMethod< + ServerStreamingEchoRequest, ServerStreamingEchoResponse>( + '/grpc.gateway.testing.EchoService/ServerStreamingEcho', + (ServerStreamingEchoRequest value) => value.writeToBuffer(), + (List value) => ServerStreamingEchoResponse.fromBuffer(value)); + + EchoServiceClient($grpc.ClientChannel channel, {$grpc.CallOptions options}) + : super(channel, options: options); + + $grpc.ResponseFuture echo(EchoRequest request, + {$grpc.CallOptions options}) { + final call = $createCall(_$echo, $async.Stream.fromIterable([request]), + options: options); + return $grpc.ResponseFuture(call); + } + + $grpc.ResponseStream serverStreamingEcho( + ServerStreamingEchoRequest request, + {$grpc.CallOptions options}) { + final call = $createCall( + _$serverStreamingEcho, $async.Stream.fromIterable([request]), + options: options); + return $grpc.ResponseStream(call); + } +} + +abstract class EchoServiceBase extends $grpc.Service { + String get $name => 'grpc.gateway.testing.EchoService'; + + EchoServiceBase() { + $addMethod($grpc.ServiceMethod( + 'Echo', + echo_Pre, + false, + false, + (List value) => EchoRequest.fromBuffer(value), + (EchoResponse value) => value.writeToBuffer())); + $addMethod($grpc.ServiceMethod( + 'ServerStreamingEcho', + serverStreamingEcho_Pre, + false, + true, + (List value) => ServerStreamingEchoRequest.fromBuffer(value), + (ServerStreamingEchoResponse value) => value.writeToBuffer())); + } + + $async.Future echo_Pre( + $grpc.ServiceCall call, $async.Future request) async { + return echo(call, await request); + } + + $async.Stream serverStreamingEcho_Pre( + $grpc.ServiceCall call, $async.Future request) async* { + yield* serverStreamingEcho( + call, (await request) as ServerStreamingEchoRequest); + } + + $async.Future echo($grpc.ServiceCall call, EchoRequest request); + $async.Stream serverStreamingEcho( + $grpc.ServiceCall call, ServerStreamingEchoRequest request); +} diff --git a/example/grpc-web/lib/src/generated/echo.pbjson.dart b/example/grpc-web/lib/src/generated/echo.pbjson.dart new file mode 100644 index 0000000..d5b5ff3 --- /dev/null +++ b/example/grpc-web/lib/src/generated/echo.pbjson.dart @@ -0,0 +1,35 @@ +/// +// Generated code. Do not modify. +// source: echo.proto +/// +// ignore_for_file: non_constant_identifier_names,library_prefixes,unused_import + +const EchoRequest$json = { + '1': 'EchoRequest', + '2': [ + {'1': 'message', '3': 1, '4': 1, '5': 9, '10': 'message'}, + ], +}; + +const EchoResponse$json = { + '1': 'EchoResponse', + '2': [ + {'1': 'message', '3': 1, '4': 1, '5': 9, '10': 'message'}, + ], +}; + +const ServerStreamingEchoRequest$json = { + '1': 'ServerStreamingEchoRequest', + '2': [ + {'1': 'message', '3': 1, '4': 1, '5': 9, '10': 'message'}, + {'1': 'message_count', '3': 2, '4': 1, '5': 5, '10': 'messageCount'}, + {'1': 'message_interval', '3': 3, '4': 1, '5': 5, '10': 'messageInterval'}, + ], +}; + +const ServerStreamingEchoResponse$json = { + '1': 'ServerStreamingEchoResponse', + '2': [ + {'1': 'message', '3': 1, '4': 1, '5': 9, '10': 'message'}, + ], +}; diff --git a/example/grpc-web/protos/echo.proto b/example/grpc-web/protos/echo.proto new file mode 100644 index 0000000..80a9507 --- /dev/null +++ b/example/grpc-web/protos/echo.proto @@ -0,0 +1,41 @@ +// Copyright 2018 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package grpc.gateway.testing; + +message EchoRequest { + string message = 1; +} + +message EchoResponse { + string message = 1; +} + +message ServerStreamingEchoRequest { + string message = 1; + int32 message_count = 2; + int32 message_interval = 3; +} + +message ServerStreamingEchoResponse { + string message = 1; +} + +service EchoService { + rpc Echo(EchoRequest) returns (EchoResponse); + rpc ServerStreamingEcho(ServerStreamingEchoRequest) + returns (stream ServerStreamingEchoResponse); +} diff --git a/example/grpc-web/pubspec.yaml b/example/grpc-web/pubspec.yaml new file mode 100644 index 0000000..918f37f --- /dev/null +++ b/example/grpc-web/pubspec.yaml @@ -0,0 +1,15 @@ +name: grpc_web +description: Dart gRPC-Web sample client +homepage: https://github.com/dart-lang/grpc-dart + +environment: + sdk: '>=2.0.0 <3.0.0' + +dependencies: + grpc: + path: ../../ + protobuf: ^0.13.4 + +dev_dependencies: + build_runner: ^0.10.0 + build_web_compilers: '^0.4.1' diff --git a/example/grpc-web/web/index.html b/example/grpc-web/web/index.html new file mode 100644 index 0000000..a457cd2 --- /dev/null +++ b/example/grpc-web/web/index.html @@ -0,0 +1,42 @@ + + + + + + + Echo Example + + + + +
+
+
+
+ + + + +
+

Example: "Hello", "4 Hello"

+
+
+
+ + diff --git a/example/grpc-web/web/main.dart b/example/grpc-web/web/main.dart new file mode 100644 index 0000000..04d6154 --- /dev/null +++ b/example/grpc-web/web/main.dart @@ -0,0 +1,47 @@ +// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +import 'dart:html'; + +import 'package:grpc/grpc_web.dart'; +import 'package:grpc_web/app.dart'; +import 'package:grpc_web/src/generated/echo.pbgrpc.dart'; + +void main() { + final channel = GrpcWebClientChannel.xhr(Uri.parse('http://localhost:8080')); + final service = EchoServiceClient(channel); + final app = EchoApp(service); + + final button = querySelector('#send') as ButtonElement; + button.onClick.listen((e) async { + final msg = querySelector('#msg') as TextInputElement; + final value = msg.value.trim(); + msg.value = ''; + + if (value.isEmpty) return false; + + if (value.indexOf(' ') > 0) { + final countStr = value.substring(0, value.indexOf(' ')); + final count = int.tryParse(countStr); + + if (count != null) { + app.repeatEcho(value.substring(value.indexOf(' ') + 1), count); + } else { + app.echo(value); + } + } else { + app.echo(value); + } + }); +} diff --git a/lib/grpc.dart b/lib/grpc.dart index 07bdb25..6316cc2 100644 --- a/lib/grpc.dart +++ b/lib/grpc.dart @@ -13,23 +13,41 @@ // See the License for the specific language governing permissions and // limitations under the License. -export 'src/auth/auth.dart'; +export 'src/auth/auth.dart' + show + BaseAuthenticator, + HttpBasedAuthenticator, + ComputeEngineAuthenticator, + ServiceAccountAuthenticator, + JwtServiceAccountAuthenticator; -export 'src/client/call.dart'; -export 'src/client/channel.dart'; -export 'src/client/client.dart'; -export 'src/client/common.dart'; -export 'src/client/connection.dart'; -export 'src/client/method.dart'; -export 'src/client/options.dart'; +export 'src/client/call.dart' show CallOptions, ClientCall, MetadataProvider; +export 'src/client/client.dart' show Client; +export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture; +export 'src/client/connection.dart' show ConnectionState; +export 'src/client/http2_channel.dart' show ClientChannel; +export 'src/client/method.dart' show ClientMethod; +export 'src/client/options.dart' + show + defaultIdleTimeout, + BackoffStrategy, + defaultBackoffStrategy, + ChannelOptions; -export 'src/server/call.dart'; -export 'src/server/handler.dart' show ServerHandler; -export 'src/server/interceptor.dart'; -export 'src/server/server.dart'; -export 'src/server/service.dart'; +export 'src/client/transport/http2_credentials.dart' + show BadCertificateHandler, allowBadCertificates, ChannelCredentials; -export 'src/shared/security.dart'; -export 'src/shared/status.dart'; -export 'src/shared/streams.dart'; -export 'src/shared/timeout.dart'; +export 'src/server/call.dart' show ServiceCall; +export 'src/server/interceptor.dart' show Interceptor; +export 'src/server/server.dart' show ServerTlsCredentials, Server; +export 'src/server/service.dart' show ServiceMethod, Service; + +export 'src/shared/message.dart' + show GrpcMessage, GrpcMetadata, GrpcData, grpcDecompressor; + +export 'src/shared/security.dart' + show supportedAlpnProtocols, createSecurityContext; +export 'src/shared/status.dart' show StatusCode, GrpcError; +export 'src/shared/streams.dart' show GrpcHttpEncoder, GrpcHttpDecoder; + +export 'src/shared/timeout.dart' show toTimeoutString, fromTimeoutString; diff --git a/lib/grpc_web.dart b/lib/grpc_web.dart new file mode 100644 index 0000000..542bbbe --- /dev/null +++ b/lib/grpc_web.dart @@ -0,0 +1,25 @@ +// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +export 'src/auth/auth.dart' + show HttpBasedAuthenticator, JwtServiceAccountAuthenticator; + +export 'src/client/call.dart' show MetadataProvider, CallOptions; + +export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture; + +export 'src/client/web_channel.dart' show GrpcWebClientChannel; + +export 'src/shared/status.dart' show StatusCode, GrpcError; diff --git a/lib/service_api.dart b/lib/service_api.dart index 54a455a..7d7a593 100644 --- a/lib/service_api.dart +++ b/lib/service_api.dart @@ -18,11 +18,10 @@ /// Mainly intended to be imported by generated code. library service_api; +export 'src/client/call.dart' show CallOptions; export 'src/client/channel.dart' show ClientChannel; export 'src/client/client.dart' show Client; export 'src/client/common.dart' show ResponseFuture, ResponseStream; export 'src/client/method.dart' show ClientMethod; -export 'src/client/options.dart' show CallOptions; export 'src/server/call.dart' show ServiceCall; -export 'src/server/server.dart' show Server; export 'src/server/service.dart' show Service, ServiceMethod; diff --git a/lib/src/auth/auth.dart b/lib/src/auth/auth.dart index 5800a22..ada06b1 100644 --- a/lib/src/auth/auth.dart +++ b/lib/src/auth/auth.dart @@ -21,7 +21,7 @@ import 'package:googleapis_auth/src/crypto/rsa_sign.dart'; import 'package:grpc/src/shared/status.dart'; import 'package:http/http.dart' as http; -import '../client/options.dart'; +import '../client/call.dart'; const _tokenExpirationThreshold = Duration(seconds: 30); diff --git a/lib/src/client/call.dart b/lib/src/client/call.dart index ed44feb..09c5aaa 100644 --- a/lib/src/client/call.dart +++ b/lib/src/client/call.dart @@ -1,4 +1,4 @@ -// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file +// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file // for details. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,15 +15,14 @@ import 'dart:async'; -import 'package:http2/transport.dart'; - +import '../shared/message.dart'; import '../shared/status.dart'; -import '../shared/streams.dart'; import 'common.dart'; import 'connection.dart'; import 'method.dart'; import 'options.dart'; +import 'transport/transport.dart'; const _reservedHeaders = [ 'content-type', @@ -33,6 +32,55 @@ const _reservedHeaders = [ 'user-agent', ]; +/// Provides per-RPC metadata. +/// +/// Metadata providers will be invoked for every RPC, and can add their own +/// metadata to the RPC. If the function returns a [Future], the RPC will await +/// completion of the returned [Future] before transmitting the request. +/// +/// The metadata provider is given the current [metadata] map (possibly modified +/// by previous metadata providers) and the [uri] that is being called, and is +/// expected to modify the map before returning or before completing the +/// returned [Future]. +typedef FutureOr MetadataProvider( + Map metadata, String uri); + +/// Runtime options for an RPC. +class CallOptions { + final Map metadata; + final Duration timeout; + final List metadataProviders; + + CallOptions._(this.metadata, this.timeout, this.metadataProviders); + + /// Creates a [CallOptions] object. + /// + /// [CallOptions] can specify static [metadata], set the [timeout], and + /// configure per-RPC metadata [providers]. The metadata [providers] are + /// invoked in order for every RPC, and can modify the outgoing metadata + /// (including metadata provided by previous providers). + factory CallOptions( + {Map metadata, + Duration timeout, + List providers}) { + return CallOptions._(Map.unmodifiable(metadata ?? {}), timeout, + List.unmodifiable(providers ?? [])); + } + + factory CallOptions.from(Iterable options) => + options.fold(CallOptions(), (p, o) => p.mergedWith(o)); + + CallOptions mergedWith(CallOptions other) { + if (other == null) return this; + final mergedMetadata = Map.from(metadata)..addAll(other.metadata); + final mergedTimeout = other.timeout ?? timeout; + final mergedProviders = List.from(metadataProviders) + ..addAll(other.metadataProviders); + return CallOptions._(Map.unmodifiable(mergedMetadata), mergedTimeout, + List.unmodifiable(mergedProviders)); + } +} + /// An active call to a gRPC endpoint. class ClientCall implements Response { final ClientMethod _method; @@ -45,9 +93,9 @@ class ClientCall implements Response { Map _headerMetadata; - TransportStream _stream; + GrpcTransportStream _stream; StreamController _responses; - StreamSubscription _requestSubscription; + StreamSubscription> _requestSubscription; StreamSubscription _responseSubscription; bool isCancelled = false; @@ -60,8 +108,6 @@ class ClientCall implements Response { } } - String get path => _method.path; - void onConnectionError(error) { _terminateWithError(GrpcError.unavailable('Error connecting: $error')); } @@ -85,6 +131,16 @@ class ClientCall implements Response { return sanitizedMetadata; } + +// TODO(sigurdm): Find out why we do this. + static String audiencePath(ClientMethod method) { + final lastSlashPos = method.path.lastIndexOf('/'); + return lastSlashPos == -1 + ? method.path + : method.path.substring(0, lastSlashPos); + } + + void onConnectionReady(ClientConnection connection) { if (isCancelled) return; @@ -92,16 +148,10 @@ class ClientCall implements Response { _sendRequest(connection, _sanitizeMetadata(options.metadata)); } else { final metadata = Map.from(options.metadata); - String audience; - if (connection.options.credentials.isSecure) { - final port = connection.port != 443 ? ':${connection.port}' : ''; - final lastSlashPos = path.lastIndexOf('/'); - final audiencePath = - lastSlashPos == -1 ? path : path.substring(0, lastSlashPos); - audience = 'https://${connection.authority}$port$audiencePath'; - } - Future.forEach(options.metadataProviders, - (provider) => provider(metadata, audience)) + Future.forEach( + options.metadataProviders, + (provider) => provider( + metadata, "${connection.authority}${audiencePath(_method)}")) .then((_) => _sendRequest(connection, _sanitizeMetadata(metadata))) .catchError(_onMetadataProviderError); } @@ -113,15 +163,14 @@ class ClientCall implements Response { void _sendRequest(ClientConnection connection, Map metadata) { try { - _stream = connection.makeRequest(path, options.timeout, metadata); + _stream = connection.makeRequest( + _method.path, options.timeout, metadata, _onRequestError); } catch (e) { _terminateWithError(GrpcError.unavailable('Error making call: $e')); return; } _requestSubscription = _requests .map(_method.requestSerializer) - .map(GrpcHttpEncoder.frame) - .map((bytes) => DataStreamMessage(bytes)) .handleError(_onRequestError) .listen(_stream.outgoingMessages.add, onError: _stream.outgoingMessages.addError, @@ -143,13 +192,10 @@ class ClientCall implements Response { if (_stream != null && _responses.hasListener && _responseSubscription == null) { - _responseSubscription = _stream.incomingMessages - .transform(GrpcHttpDecoder()) - .transform(grpcDecompressor()) - .listen(_onResponseData, - onError: _onResponseError, - onDone: _onResponseDone, - cancelOnError: true); + _responseSubscription = _stream.incomingMessages.listen(_onResponseData, + onError: _onResponseError, + onDone: _onResponseDone, + cancelOnError: true); if (_responses.isPaused) { _responseSubscription.pause(); } @@ -252,7 +298,7 @@ class ClientCall implements Response { /// Error handler for the requests stream. Something went wrong while trying /// to send the request to the server. Abort the request, and forward the /// error to the user code on the [_responses] stream. - void _onRequestError(error) { + void _onRequestError(error, [StackTrace stackTrace]) { if (error is! GrpcError) { error = GrpcError.unknown(error.toString()); } @@ -299,7 +345,9 @@ class ClientCall implements Response { await Future.wait(futures); } - Future _safeTerminate() { - return _terminate().catchError((_) {}); + Future _safeTerminate() async { + try { + await _terminate(); + } catch (_) {} } } diff --git a/lib/src/client/channel.dart b/lib/src/client/channel.dart index beda154..a7c4a5a 100644 --- a/lib/src/client/channel.dart +++ b/lib/src/client/channel.dart @@ -20,54 +20,59 @@ import '../shared/status.dart'; import 'call.dart'; import 'connection.dart'; import 'method.dart'; -import 'options.dart'; /// A channel to a virtual RPC endpoint. -/// -/// For each RPC, the channel picks a [ClientConnection] to dispatch the call. -/// RPCs on the same channel may be sent to different connections, depending on -/// load balancing settings. -class ClientChannel { - final String host; - final int port; - final ChannelOptions options; +abstract class ClientChannel { + /// Shuts down this channel. + /// + /// No further RPCs can be made on this channel. RPCs already in progress will + /// be allowed to complete. + Future shutdown(); + /// Terminates this channel. + /// + /// RPCs already in progress will be terminated. No further RPCs can be made + /// on this channel. + Future terminate(); + + /// Initiates a new RPC on this connection. + ClientCall createCall( + ClientMethod method, Stream requests, CallOptions options); +} + +/// Auxiliary base class implementing much of ClientChannel. +abstract class ClientChannelBase implements ClientChannel { // TODO(jakobr): Multiple connections, load balancing. ClientConnection _connection; bool _isShutdown = false; - ClientChannel(this.host, - {this.port = 443, this.options = const ChannelOptions()}); + ClientChannelBase(); - /// Shuts down this channel. - /// - /// No further RPCs can be made on this channel. RPCs already in progress will - /// be allowed to complete. + @override Future shutdown() async { if (_isShutdown) return; _isShutdown = true; if (_connection != null) await _connection.shutdown(); } - /// Terminates this channel. - /// - /// RPCs already in progress will be terminated. No further RPCs can be made - /// on this channel. + @override Future terminate() async { _isShutdown = true; if (_connection != null) await _connection.terminate(); } + ClientConnection createConnection(); + /// Returns a connection to this [Channel]'s RPC endpoint. /// /// The connection may be shared between multiple RPCs. Future getConnection() async { if (_isShutdown) throw GrpcError.unavailable('Channel shutting down.'); - return _connection ??= ClientConnection(host, port, options); + return _connection ??= createConnection(); } - /// Initiates a new RPC on this connection. + @override ClientCall createCall( ClientMethod method, Stream requests, CallOptions options) { final call = ClientCall(method, requests, options); diff --git a/lib/src/client/client.dart b/lib/src/client/client.dart index 71c49b3..aceda43 100644 --- a/lib/src/client/client.dart +++ b/lib/src/client/client.dart @@ -18,7 +18,6 @@ import 'dart:async'; import 'call.dart'; import 'channel.dart'; import 'method.dart'; -import 'options.dart'; /// Base class for client stubs. class Client { diff --git a/lib/src/client/connection.dart b/lib/src/client/connection.dart index f9a7df1..b6c829d 100644 --- a/lib/src/client/connection.dart +++ b/lib/src/client/connection.dart @@ -14,16 +14,10 @@ // limitations under the License. import 'dart:async'; -import 'dart:convert'; -import 'dart:io'; - -import 'package:http2/transport.dart'; -import 'package:meta/meta.dart'; - -import '../shared/timeout.dart'; import 'call.dart'; -import 'options.dart'; + +import 'transport/transport.dart'; enum ConnectionState { /// Actively trying to connect. @@ -42,245 +36,25 @@ enum ConnectionState { shutdown } -/// A connection to a single RPC endpoint. -/// -/// RPCs made on a connection are always sent to the same endpoint. -class ClientConnection { - static final _methodPost = Header.ascii(':method', 'POST'); - static final _schemeHttp = Header.ascii(':scheme', 'http'); - static final _schemeHttps = Header.ascii(':scheme', 'https'); - static final _contentTypeGrpc = - Header.ascii('content-type', 'application/grpc'); - static final _teTrailers = Header.ascii('te', 'trailers'); - static final _grpcAcceptEncoding = - Header.ascii('grpc-accept-encoding', 'identity'); +abstract class ClientConnection { + String get authority; - final String host; - final int port; - final ChannelOptions options; + /// Put [call] on the queue to be dispatched when the connection is ready. + void dispatchCall(ClientCall call); - ConnectionState _state = ConnectionState.idle; - void Function(ClientConnection connection) onStateChanged; - final _pendingCalls = []; - - ClientTransportConnection _transport; - - /// Used for idle and reconnect timeout, depending on [_state]. - Timer _timer; - Duration _currentReconnectDelay; - - ClientConnection(this.host, this.port, this.options); - - ConnectionState get state => _state; - - static List
createCallHeaders(bool useTls, String authority, - String path, Duration timeout, Map metadata, - {String userAgent}) { - final headers = [ - _methodPost, - useTls ? _schemeHttps : _schemeHttp, - Header(ascii.encode(':path'), utf8.encode(path)), - Header(ascii.encode(':authority'), utf8.encode(authority)), - ]; - if (timeout != null) { - headers.add(Header.ascii('grpc-timeout', toTimeoutString(timeout))); - } - headers.addAll([ - _contentTypeGrpc, - _teTrailers, - _grpcAcceptEncoding, - Header.ascii('user-agent', userAgent ?? defaultUserAgent), - ]); - metadata?.forEach((key, value) { - headers.add(Header(ascii.encode(key), utf8.encode(value))); - }); - return headers; - } - - String get authority => options.credentials.authority ?? host; - - @visibleForTesting - Future connectTransport() async { - final securityContext = options.credentials.securityContext; - - var socket = await Socket.connect(host, port); - if (_state == ConnectionState.shutdown) { - socket.destroy(); - throw 'Shutting down'; - } - if (securityContext != null) { - socket = await SecureSocket.secure(socket, - host: authority, - context: securityContext, - onBadCertificate: _validateBadCertificate); - if (_state == ConnectionState.shutdown) { - socket.destroy(); - throw 'Shutting down'; - } - } - socket.done.then(_handleSocketClosed); - return ClientTransportConnection.viaSocket(socket); - } - - bool _validateBadCertificate(X509Certificate certificate) { - final validator = options.credentials.onBadCertificate; - if (validator == null) return false; - return validator(certificate, authority); - } - - void _connect() { - if (_state != ConnectionState.idle && - _state != ConnectionState.transientFailure) { - return; - } - _setState(ConnectionState.connecting); - connectTransport().then((transport) { - _currentReconnectDelay = null; - _transport = transport; - _transport.onActiveStateChanged = _handleActiveStateChanged; - _setState(ConnectionState.ready); - _pendingCalls.forEach(_startCall); - _pendingCalls.clear(); - }).catchError(_handleConnectionFailure); - } - - void dispatchCall(ClientCall call) { - switch (_state) { - case ConnectionState.ready: - _startCall(call); - break; - case ConnectionState.shutdown: - _shutdownCall(call); - break; - default: - _pendingCalls.add(call); - if (_state == ConnectionState.idle) { - _connect(); - } - } - } - - ClientTransportStream makeRequest( - String path, Duration timeout, Map metadata) { - final headers = createCallHeaders( - options.credentials.isSecure, authority, path, timeout, metadata, - userAgent: options.userAgent); - return _transport.makeRequest(headers); - } - - void _startCall(ClientCall call) { - if (call.isCancelled) return; - call.onConnectionReady(this); - } - - void _failCall(ClientCall call, dynamic error) { - if (call.isCancelled) return; - call.onConnectionError(error); - } - - void _shutdownCall(ClientCall call) { - _failCall(call, 'Connection shutting down.'); - } + /// Start a request for [path] with [metadata]. + GrpcTransportStream makeRequest(String path, Duration timeout, + Map metadata, ErrorHandler onRequestFailure); /// Shuts down this connection. /// /// No further calls may be made on this connection, but existing calls /// are allowed to finish. - Future shutdown() async { - if (_state == ConnectionState.shutdown) return null; - _setShutdownState(); - await _transport?.finish(); - } + Future shutdown(); /// Terminates this connection. /// /// All open calls are terminated immediately, and no further calls may be /// made on this connection. - Future terminate() async { - _setShutdownState(); - await _transport?.terminate(); - } - - void _setShutdownState() { - _setState(ConnectionState.shutdown); - _cancelTimer(); - _pendingCalls.forEach(_shutdownCall); - _pendingCalls.clear(); - } - - void _setState(ConnectionState state) { - _state = state; - if (onStateChanged != null) { - onStateChanged(this); - } - } - - void _handleIdleTimeout() { - if (_timer == null || _state != ConnectionState.ready) return; - _cancelTimer(); - _transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error. - _transport = null; - _setState(ConnectionState.idle); - } - - void _cancelTimer() { - _timer?.cancel(); - _timer = null; - } - - void _handleActiveStateChanged(bool isActive) { - if (isActive) { - _cancelTimer(); - } else { - if (options.idleTimeout != null) { - _timer ??= Timer(options.idleTimeout, _handleIdleTimeout); - } - } - } - - bool _hasPendingCalls() { - // Get rid of pending calls that have timed out. - _pendingCalls.removeWhere((call) => call.isCancelled); - return _pendingCalls.isNotEmpty; - } - - void _handleConnectionFailure(error) { - _transport = null; - if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) { - return; - } - // TODO(jakobr): Log error. - _cancelTimer(); - _pendingCalls.forEach((call) => _failCall(call, error)); - _pendingCalls.clear(); - _setState(ConnectionState.idle); - } - - void _handleReconnect() { - if (_timer == null || _state != ConnectionState.transientFailure) return; - _cancelTimer(); - _connect(); - } - - void _handleSocketClosed(_) { - _cancelTimer(); - _transport = null; - - if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) { - // All good. - return; - } - - // We were not planning to close the socket. - if (!_hasPendingCalls()) { - // No pending calls. Just hop to idle, and wait for a new RPC. - _setState(ConnectionState.idle); - return; - } - - // We have pending RPCs. Reconnect after backoff delay. - _setState(ConnectionState.transientFailure); - _currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay); - _timer = Timer(_currentReconnectDelay, _handleReconnect); - } + Future terminate(); } diff --git a/lib/src/client/http2_channel.dart b/lib/src/client/http2_channel.dart new file mode 100644 index 0000000..316c117 --- /dev/null +++ b/lib/src/client/http2_channel.dart @@ -0,0 +1,40 @@ +// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'channel.dart'; +import 'connection.dart'; +import 'http2_connection.dart' show Http2ClientConnection; +import 'options.dart'; +import 'transport/http2_credentials.dart'; + +/// A channel to a virtual gRPC endpoint. +/// +/// For each RPC, the channel picks a [Http2ClientConnection] to dispatch the call. +/// RPCs on the same channel may be sent to different connections, depending on +/// load balancing settings. +class ClientChannel extends ClientChannelBase { + final String host; + final int port; + final ChannelOptions options; + + ClientChannel(this.host, + {this.port = 443, this.options = const ChannelOptions()}) + : super(); + + @override + ClientConnection createConnection() { + return Http2ClientConnection(host, port, options); + } +} diff --git a/lib/src/client/http2_connection.dart b/lib/src/client/http2_connection.dart new file mode 100644 index 0000000..80e5c5b --- /dev/null +++ b/lib/src/client/http2_connection.dart @@ -0,0 +1,275 @@ +// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:convert'; +import 'dart:io'; + +import 'package:http2/transport.dart'; +import 'package:meta/meta.dart'; + +import '../shared/timeout.dart'; + +import 'call.dart'; +import 'connection.dart' hide ClientConnection; +import 'connection.dart' as connection; + +import 'options.dart'; +import 'transport/http2_credentials.dart'; +import 'transport/http2_transport.dart'; +import 'transport/transport.dart'; + +class Http2ClientConnection implements connection.ClientConnection { + static final _methodPost = Header.ascii(':method', 'POST'); + static final _schemeHttp = Header.ascii(':scheme', 'http'); + static final _schemeHttps = Header.ascii(':scheme', 'https'); + static final _contentTypeGrpc = + Header.ascii('content-type', 'application/grpc'); + static final _teTrailers = Header.ascii('te', 'trailers'); + static final _grpcAcceptEncoding = + Header.ascii('grpc-accept-encoding', 'identity'); + + final ChannelOptions options; + + connection.ConnectionState _state = ConnectionState.idle; + + @visibleForTesting + void Function(Http2ClientConnection connection) onStateChanged; + final _pendingCalls = []; + + ClientTransportConnection _transportConnection; + + /// Used for idle and reconnect timeout, depending on [_state]. + Timer _timer; + Duration _currentReconnectDelay; + + final String host; + final int port; + + Http2ClientConnection(this.host, this.port, this.options); + + ChannelCredentials get credentials => options.credentials; + + String get authority => + options.credentials.authority ?? port == 443 ? host : "$host:$port"; + + ConnectionState get state => _state; + + Future connectTransport() async { + var socket = await Socket.connect(host, port); + if (_state == ConnectionState.shutdown) { + socket.destroy(); + // TODO(sigurdm): Throw something nicer... + throw 'Shutting down'; + } + final securityContext = credentials.securityContext; + if (securityContext != null) { + socket = await SecureSocket.secure(socket, + host: authority, + context: securityContext, + onBadCertificate: _validateBadCertificate); + if (_state == ConnectionState.shutdown) { + socket.destroy(); + // TODO(sigurdm): Throw something nicer... + throw 'Shutting down'; + } + } + socket.done.then((_) => _handleSocketClosed()); + return ClientTransportConnection.viaSocket(socket); + } + + void _connect() { + if (_state != ConnectionState.idle && + _state != ConnectionState.transientFailure) { + return; + } + _setState(ConnectionState.connecting); + connectTransport().then((transport) { + _currentReconnectDelay = null; + _transportConnection = transport; + transport.onActiveStateChanged = _handleActiveStateChanged; + _setState(ConnectionState.ready); + _pendingCalls.forEach(_startCall); + _pendingCalls.clear(); + }).catchError(_handleConnectionFailure); + } + + void dispatchCall(ClientCall call) { + switch (_state) { + case ConnectionState.ready: + _startCall(call); + break; + case ConnectionState.shutdown: + _shutdownCall(call); + break; + default: + _pendingCalls.add(call); + if (_state == ConnectionState.idle) { + _connect(); + } + } + } + + GrpcTransportStream makeRequest(String path, Duration timeout, + Map metadata, ErrorHandler onRequestFailure) { + final headers = createCallHeaders( + credentials.isSecure, authority, path, timeout, metadata, + userAgent: options.userAgent); + final stream = _transportConnection.makeRequest(headers); + return Http2TransportStream(stream, onRequestFailure); + } + + void _startCall(ClientCall call) { + if (call.isCancelled) return; + call.onConnectionReady(this); + } + + void _failCall(ClientCall call, dynamic error) { + if (call.isCancelled) return; + call.onConnectionError(error); + } + + void _shutdownCall(ClientCall call) { + _failCall(call, 'Connection shutting down.'); + } + + Future shutdown() async { + if (_state == ConnectionState.shutdown) return null; + _setShutdownState(); + await _transportConnection?.finish(); + } + + Future terminate() async { + _setShutdownState(); + await _transportConnection?.terminate(); + } + + void _setShutdownState() { + _setState(ConnectionState.shutdown); + _cancelTimer(); + _pendingCalls.forEach(_shutdownCall); + _pendingCalls.clear(); + } + + void _setState(ConnectionState state) { + _state = state; + if (onStateChanged != null) { + onStateChanged(this); + } + } + + void _handleIdleTimeout() { + if (_timer == null || _state != ConnectionState.ready) return; + _cancelTimer(); + _transportConnection + ?.finish() + ?.catchError((_) => {}); // TODO(jakobr): Log error. + _transportConnection = null; + _setState(ConnectionState.idle); + } + + void _cancelTimer() { + _timer?.cancel(); + _timer = null; + } + + void _handleActiveStateChanged(bool isActive) { + if (isActive) { + _cancelTimer(); + } else { + if (options.idleTimeout != null) { + _timer ??= Timer(options.idleTimeout, _handleIdleTimeout); + } + } + } + + bool _hasPendingCalls() { + // Get rid of pending calls that have timed out. + _pendingCalls.removeWhere((call) => call.isCancelled); + return _pendingCalls.isNotEmpty; + } + + void _handleConnectionFailure(error) { + _transportConnection = null; + if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) { + return; + } + // TODO(jakobr): Log error. + _cancelTimer(); + _pendingCalls.forEach((call) => _failCall(call, error)); + _pendingCalls.clear(); + _setState(ConnectionState.idle); + } + + void _handleReconnect() { + if (_timer == null || _state != ConnectionState.transientFailure) return; + _cancelTimer(); + _connect(); + } + + void _handleSocketClosed() { + _cancelTimer(); + _transportConnection = null; + + if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) { + // All good. + return; + } + + // We were not planning to close the socket. + if (!_hasPendingCalls()) { + // No pending calls. Just hop to idle, and wait for a new RPC. + _setState(ConnectionState.idle); + return; + } + + // We have pending RPCs. Reconnect after backoff delay. + _setState(ConnectionState.transientFailure); + _currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay); + _timer = Timer(_currentReconnectDelay, _handleReconnect); + } + + static List
createCallHeaders(bool useTls, String authority, + String path, Duration timeout, Map metadata, + {String userAgent}) { + final headers = [ + _methodPost, + useTls ? _schemeHttps : _schemeHttp, + Header(ascii.encode(':path'), utf8.encode(path)), + Header(ascii.encode(':authority'), utf8.encode(authority)), + ]; + if (timeout != null) { + headers.add(Header.ascii('grpc-timeout', toTimeoutString(timeout))); + } + headers.addAll([ + _contentTypeGrpc, + _teTrailers, + _grpcAcceptEncoding, + Header.ascii('user-agent', userAgent ?? defaultUserAgent), + ]); + metadata?.forEach((key, value) { + headers.add(Header(ascii.encode(key), utf8.encode(value))); + }); + return headers; + } + + bool _validateBadCertificate(X509Certificate certificate) { + final credentials = this.credentials; + final validator = credentials.onBadCertificate; + + if (validator == null) return false; + return validator(certificate, authority); + } +} diff --git a/lib/src/client/options.dart b/lib/src/client/options.dart index b8c145c..f9a5413 100644 --- a/lib/src/client/options.dart +++ b/lib/src/client/options.dart @@ -13,20 +13,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -import 'dart:async'; -import 'dart:io'; - import 'dart:math'; - -import '../shared/security.dart'; +import 'transport/http2_credentials.dart'; const defaultIdleTimeout = Duration(minutes: 5); -const defaultUserAgent = 'dart-grpc/1.0.3'; +const defaultUserAgent = 'dart-grpc/2.0.0'; -typedef BackoffStrategy = Duration Function(Duration lastBackoff); +typedef Duration BackoffStrategy(Duration lastBackoff); // Backoff algorithm from https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md -const _minConnectTimeout = Duration(seconds: 20); const _initialBackoff = Duration(seconds: 1); const _maxBackoff = Duration(seconds: 120); const _multiplier = 1.6; @@ -40,58 +35,6 @@ Duration defaultBackoffStrategy(Duration lastBackoff) { return nextBackoff < _maxBackoff ? nextBackoff : _maxBackoff; } -/// Handler for checking certificates that fail validation. If this handler -/// returns `true`, the bad certificate is allowed, and the TLS handshake can -/// continue. If the handler returns `false`, the TLS handshake fails, and the -/// connection is aborted. -typedef BadCertificateHandler = bool Function( - X509Certificate certificate, String host); - -/// Bad certificate handler that disables all certificate checks. -/// DO NOT USE IN PRODUCTION! -/// Can be used during development and testing to accept self-signed -/// certificates, etc. -bool allowBadCertificates(X509Certificate certificate, String host) => true; - -/// Options controlling TLS security settings on a [ClientChannel]. -class ChannelCredentials { - final bool isSecure; - final List _certificateBytes; - final String _certificatePassword; - final String authority; - final BadCertificateHandler onBadCertificate; - - const ChannelCredentials._(this.isSecure, this._certificateBytes, - this._certificatePassword, this.authority, this.onBadCertificate); - - /// Disable TLS. RPCs are sent in clear text. - const ChannelCredentials.insecure({String authority}) - : this._(false, null, null, authority, null); - - /// Enable TLS and optionally specify the [certificates] to trust. If - /// [certificates] is not provided, the default trust store is used. - const ChannelCredentials.secure( - {List certificates, - String password, - String authority, - BadCertificateHandler onBadCertificate}) - : this._(true, certificates, password, authority, onBadCertificate); - - SecurityContext get securityContext { - if (!isSecure) return null; - if (_certificateBytes != null) { - return createSecurityContext(false) - ..setTrustedCertificatesBytes(_certificateBytes, - password: _certificatePassword); - } - final context = SecurityContext(withTrustedRoots: true); - if (SecurityContext.alpnSupported) { - context.setAlpnProtocols(supportedAlpnProtocols, false); - } - return context; - } -} - /// Options controlling how connections are made on a [ClientChannel]. class ChannelOptions { final ChannelCredentials credentials; @@ -109,52 +52,3 @@ class ChannelOptions { this.userAgent = userAgent ?? defaultUserAgent, this.backoffStrategy = backoffStrategy ?? defaultBackoffStrategy; } - -/// Provides per-RPC metadata. -/// -/// Metadata providers will be invoked for every RPC, and can add their own -/// metadata to the RPC. If the function returns a [Future], the RPC will await -/// completion of the returned [Future] before transmitting the request. -/// -/// The metadata provider is given the current [metadata] map (possibly modified -/// by previous metadata providers) and the [uri] that is being called, and is -/// expected to modify the map before returning or before completing the -/// returned [Future]. -typedef MetadataProvider = FutureOr Function( - Map metadata, String uri); - -/// Runtime options for an RPC. -class CallOptions { - final Map metadata; - final Duration timeout; - final List metadataProviders; - - CallOptions._(this.metadata, this.timeout, this.metadataProviders); - - /// Creates a [CallOptions] object. - /// - /// [CallOptions] can specify static [metadata], set the [timeout], and - /// configure per-RPC metadata [providers]. The metadata [providers] are - /// invoked in order for every RPC, and can modify the outgoing metadata - /// (including metadata provided by previous providers). - factory CallOptions( - {Map metadata, - Duration timeout, - List providers}) { - return CallOptions._(Map.unmodifiable(metadata ?? {}), timeout, - List.unmodifiable(providers ?? [])); - } - - factory CallOptions.from(Iterable options) => - options.fold(CallOptions(), (p, o) => p.mergedWith(o)); - - CallOptions mergedWith(CallOptions other) { - if (other == null) return this; - final mergedMetadata = Map.from(metadata)..addAll(other.metadata); - final mergedTimeout = other.timeout ?? timeout; - final mergedProviders = List.from(metadataProviders) - ..addAll(other.metadataProviders); - return CallOptions._(Map.unmodifiable(mergedMetadata), mergedTimeout, - List.unmodifiable(mergedProviders)); - } -} diff --git a/lib/src/client/transport/http2_credentials.dart b/lib/src/client/transport/http2_credentials.dart new file mode 100644 index 0000000..b51c41a --- /dev/null +++ b/lib/src/client/transport/http2_credentials.dart @@ -0,0 +1,68 @@ +// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:io'; + +import '../../shared/security.dart'; +import '../options.dart' as options; + +/// Handler for checking certificates that fail validation. If this handler +/// returns `true`, the bad certificate is allowed, and the TLS handshake can +/// continue. If the handler returns `false`, the TLS handshake fails, and the +/// connection is aborted. +typedef bool BadCertificateHandler(X509Certificate certificate, String host); + +/// Bad certificate handler that disables all certificate checks. +/// DO NOT USE IN PRODUCTION! +/// Can be used during development and testing to accept self-signed +/// certificates, etc. +bool allowBadCertificates(X509Certificate certificate, String host) => true; + +/// Options controlling TLS security settings on a [ClientChannel]. +class ChannelCredentials { + final bool isSecure; + final String authority; + final List _certificateBytes; + final String _certificatePassword; + final BadCertificateHandler onBadCertificate; + + const ChannelCredentials._(this.isSecure, this._certificateBytes, + this._certificatePassword, this.authority, this.onBadCertificate); + + /// Disable TLS. RPCs are sent in clear text. + const ChannelCredentials.insecure({String authority}) + : this._(false, null, null, authority, null); + + /// Enable TLS and optionally specify the [certificates] to trust. If + /// [certificates] is not provided, the default trust store is used. + const ChannelCredentials.secure( + {List certificates, + String password, + String authority, + BadCertificateHandler onBadCertificate}) + : this._(true, certificates, password, authority, onBadCertificate); + + SecurityContext get securityContext { + if (!isSecure) return null; + if (_certificateBytes != null) { + return createSecurityContext(false) + ..setTrustedCertificatesBytes(_certificateBytes, + password: _certificatePassword); + } + final context = new SecurityContext(withTrustedRoots: true); + context.setAlpnProtocols(supportedAlpnProtocols, false); + return context; + } +} diff --git a/lib/src/client/transport/http2_transport.dart b/lib/src/client/transport/http2_transport.dart new file mode 100644 index 0000000..b7cf8e8 --- /dev/null +++ b/lib/src/client/transport/http2_transport.dart @@ -0,0 +1,52 @@ +// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import 'package:http2/transport.dart'; + +import '../../shared/message.dart'; +import '../../shared/streams.dart'; + +import 'transport.dart'; + +class Http2TransportStream extends GrpcTransportStream { + final TransportStream _transportStream; + final Stream incomingMessages; + final StreamController> _outgoingMessages = StreamController(); + final ErrorHandler _onError; + + StreamSink> get outgoingMessages => _outgoingMessages.sink; + + Http2TransportStream(this._transportStream, this._onError) + : incomingMessages = _transportStream.incomingMessages + .transform(GrpcHttpDecoder()) + .transform(grpcDecompressor()) { + _outgoingMessages.stream + .map(frame) + .map((bytes) => DataStreamMessage(bytes)) + .handleError(_onError) + .listen(_transportStream.outgoingMessages.add, + onError: _transportStream.outgoingMessages.addError, + onDone: _transportStream.outgoingMessages.close, + cancelOnError: true); + } + + @override + Future terminate() async { + await _outgoingMessages.close(); + _transportStream.terminate(); + } +} diff --git a/lib/src/client/transport/transport.dart b/lib/src/client/transport/transport.dart new file mode 100644 index 0000000..dff90b2 --- /dev/null +++ b/lib/src/client/transport/transport.dart @@ -0,0 +1,29 @@ +// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import '../../shared/message.dart'; + +typedef void SocketClosedHandler(); +typedef void ActiveStateHandler(bool isActive); +typedef void ErrorHandler(error); + +abstract class GrpcTransportStream { + Stream get incomingMessages; + StreamSink> get outgoingMessages; + + Future terminate(); +} diff --git a/lib/src/client/transport/web_streams.dart b/lib/src/client/transport/web_streams.dart new file mode 100644 index 0000000..4159ead --- /dev/null +++ b/lib/src/client/transport/web_streams.dart @@ -0,0 +1,157 @@ +// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:convert'; +import 'dart:math'; +import 'dart:typed_data'; + +import '../../shared/message.dart'; +import '../../shared/status.dart'; + +enum _GrpcWebParseState { Init, Length, Message } + +class GrpcWebDecoder extends Converter { + @override + GrpcMessage convert(ByteBuffer input) { + final sink = GrpcMessageSink(); + startChunkedConversion(sink) + ..add(input) + ..close(); + return sink.message; + } + + @override + Sink startChunkedConversion(Sink sink) { + return _GrpcWebConversionSink(sink); + } +} + +class _GrpcWebConversionSink extends ChunkedConversionSink { + static const int frameTypeData = 0x00; + static const int frameTypeTrailers = 0x80; + + final Sink _out; + + final _dataHeader = Uint8List(4); + + _GrpcWebParseState _state = _GrpcWebParseState.Init; + int _chunkOffset; + int _frameType; + int _dataOffset = 0; + Uint8List _data; + + _GrpcWebConversionSink(this._out); + + int _parseFrameType(List chunkData) { + final frameType = chunkData[_chunkOffset]; + _chunkOffset++; + if (frameType != frameTypeData && frameType != frameTypeTrailers) { + throw GrpcError.unimplemented('Invalid frame type: ${frameType}'); + } + _state = _GrpcWebParseState.Length; + return frameType; + } + + void _parseLength(List chunkData) { + final chunkLength = chunkData.length; + + final headerRemaining = _dataHeader.lengthInBytes - _dataOffset; + final chunkRemaining = chunkLength - _chunkOffset; + final toCopy = min(headerRemaining, chunkRemaining); + _dataHeader.setRange( + _dataOffset, _dataOffset + toCopy, chunkData, _chunkOffset); + _dataOffset += toCopy; + _chunkOffset += toCopy; + if (_dataOffset == _dataHeader.lengthInBytes) { + final dataLength = _dataHeader.buffer.asByteData().getUint32(0); + _dataOffset = 0; + _state = _GrpcWebParseState.Message; + _data = Uint8List(dataLength); + if (dataLength == 0) { + // empty message + _finishMessage(); + } + } + } + + void _parseMessage(List chunkData) { + final dataRemaining = _data.lengthInBytes - _dataOffset; + if (dataRemaining > 0) { + final chunkRemaining = chunkData.length - _chunkOffset; + final toCopy = min(dataRemaining, chunkRemaining); + _data.setRange( + _dataOffset, _dataOffset + toCopy, chunkData, _chunkOffset); + _dataOffset += toCopy; + _chunkOffset += toCopy; + } + if (_dataOffset == _data.lengthInBytes) { + _finishMessage(); + } + } + + void _finishMessage() { + switch (_frameType) { + case frameTypeData: + _out.add(GrpcData(_data, isCompressed: false)); + break; + case frameTypeTrailers: + final stringData = String.fromCharCodes(_data); + final headers = _parseHttp1Headers(stringData); + _out.add(GrpcMetadata(headers)); + break; + } + _state = _GrpcWebParseState.Init; + _data = null; + _dataOffset = 0; + } + + Map _parseHttp1Headers(String stringData) { + final chunks = stringData.trim().split('\r\n'); + final headers = {}; + for (final chunk in chunks) { + final pos = chunk.indexOf(':'); + headers[chunk.substring(0, pos).trim()] = chunk.substring(pos + 1).trim(); + } + return headers; + } + + @override + void add(ByteBuffer chunk) { + _chunkOffset = 0; + final chunkData = chunk.asUint8List(); + while (_chunkOffset < chunk.lengthInBytes) { + switch (_state) { + case _GrpcWebParseState.Init: + _frameType = _parseFrameType(chunkData); + break; + case _GrpcWebParseState.Length: + _parseLength(chunkData); + break; + case _GrpcWebParseState.Message: + _parseMessage(chunkData); + break; + } + } + _chunkOffset = 0; + } + + @override + void close() { + if (_data != null || _dataOffset != 0) { + throw GrpcError.unavailable('Closed in non-idle state'); + } + _out.close(); + } +} diff --git a/lib/src/client/transport/xhr_transport.dart b/lib/src/client/transport/xhr_transport.dart new file mode 100644 index 0000000..cab7871 --- /dev/null +++ b/lib/src/client/transport/xhr_transport.dart @@ -0,0 +1,194 @@ +// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:html'; +import 'dart:typed_data'; + +import 'package:grpc/src/client/call.dart'; +import 'package:meta/meta.dart'; + +import '../../shared/message.dart'; +import '../../shared/status.dart'; +import '../connection.dart'; +import 'transport.dart'; +import 'web_streams.dart'; + +class XhrTransportStream implements GrpcTransportStream { + final HttpRequest _request; + final ErrorHandler _onError; + final Function(XhrTransportStream stream) _onDone; + int _requestBytesRead = 0; + final StreamController _incomingProcessor = StreamController(); + final StreamController _incomingMessages = StreamController(); + final StreamController> _outgoingMessages = StreamController(); + + @override + Stream get incomingMessages => _incomingMessages.stream; + + @override + StreamSink> get outgoingMessages => _outgoingMessages.sink; + + XhrTransportStream(this._request, {onError, onDone}) + : _onError = onError, + _onDone = onDone { + _outgoingMessages.stream + .map(frame) + .listen((data) => _request.send(data), cancelOnError: true); + + _request.onReadyStateChange.listen((data) { + if (_incomingMessages.isClosed) { + return; + } + switch (_request.readyState) { + case HttpRequest.HEADERS_RECEIVED: + _onHeadersReceived(); + break; + case HttpRequest.DONE: + if (_request.status != 200) { + _onError(GrpcError.unavailable( + 'XhrConnection status ${_request.status}')); + } else { + _close(); + } + break; + } + }); + + _request.onError.listen((ProgressEvent event) { + if (_incomingMessages.isClosed) { + return; + } + _onError(GrpcError.unavailable('XhrConnection connection-error')); + terminate(); + }); + + _request.onProgress.listen((_) { + if (_incomingMessages.isClosed) { + return; + } + // Use response over responseText as most browsers don't support + // using responseText during an onProgress event. + final responseString = _request.response as String; + final bytes = Uint8List.fromList( + responseString.substring(_requestBytesRead).codeUnits) + .buffer; + _requestBytesRead = responseString.length; + _incomingProcessor.add(bytes); + }); + + _incomingProcessor.stream + .transform(GrpcWebDecoder()) + .transform(grpcDecompressor()) + .listen(_incomingMessages.add, + onError: _onError, onDone: _incomingMessages.close); + } + + _onHeadersReceived() { + final contentType = _request.getResponseHeader('Content-Type'); + if (_request.status != 200) { + _onError( + GrpcError.unavailable('XhrConnection status ${_request.status}')); + return; + } + if (contentType == null) { + _onError(GrpcError.unavailable('XhrConnection missing Content-Type')); + return; + } + if (!contentType.startsWith('application/grpc')) { + _onError( + GrpcError.unavailable('XhrConnection bad Content-Type $contentType')); + return; + } + if (_request.response == null) { + _onError(GrpcError.unavailable('XhrConnection request null response')); + return; + } + + // Force a metadata message with headers. + final headers = GrpcMetadata(_request.responseHeaders); + _incomingMessages.add(headers); + } + + _close() { + _incomingProcessor.close(); + _outgoingMessages.close(); + _onDone(this); + } + + @override + Future terminate() async { + _close(); + _request.abort(); + } +} + +class XhrClientConnection extends ClientConnection { + final Uri uri; + + final Set _requests = Set(); + + XhrClientConnection(this.uri); + + String get authority => uri.authority; + + void _initializeRequest(HttpRequest request, Map metadata) { + for (final header in metadata.keys) { + request.setRequestHeader(header, metadata[header]); + } + request.setRequestHeader('Content-Type', 'application/grpc-web+proto'); + request.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1'); + request.setRequestHeader('X-Grpc-Web', '1'); + // Overriding the mimetype allows us to stream and parse the data + request.overrideMimeType('text/plain; charset=x-user-defined'); + request.responseType = 'text'; + } + + @visibleForTesting + HttpRequest createHttpRequest() => HttpRequest(); + + @override + GrpcTransportStream makeRequest(String path, Duration timeout, + Map metadata, ErrorHandler onError) { + final HttpRequest request = createHttpRequest(); + request.open('POST', uri.resolve(path).toString()); + + _initializeRequest(request, metadata); + + final XhrTransportStream transportStream = + XhrTransportStream(request, onError: onError, onDone: _removeStream); + _requests.add(transportStream); + return transportStream; + } + + void _removeStream(XhrTransportStream stream) { + _requests.remove(stream); + } + + @override + Future terminate() async { + for (XhrTransportStream request in _requests) { + request.terminate(); + } + } + + @override + void dispatchCall(ClientCall call) { + call.onConnectionReady(this); + } + + @override + Future shutdown() async {} +} diff --git a/lib/src/client/web_channel.dart b/lib/src/client/web_channel.dart new file mode 100644 index 0000000..7c5bf68 --- /dev/null +++ b/lib/src/client/web_channel.dart @@ -0,0 +1,33 @@ +// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; + +import 'channel.dart'; +import 'connection.dart'; +import 'options.dart'; +import 'transport/xhr_transport.dart'; + +/// A channel to a grpc-web endpoint. +class GrpcWebClientChannel extends ClientChannelBase { + final Uri uri; + + GrpcWebClientChannel.xhr(this.uri) : super(); + + @override + ClientConnection createConnection() { + return XhrClientConnection(uri); + } +} diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 0be1d0d..ea5b05a 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -18,6 +18,7 @@ import 'dart:convert'; import 'package:http2/transport.dart'; +import '../shared/message.dart'; import '../shared/status.dart'; import '../shared/streams.dart'; import '../shared/timeout.dart'; @@ -223,7 +224,7 @@ class ServerHandler_ extends ServiceCall { if (!_headersSent) { sendHeaders(); } - _stream.sendData(GrpcHttpEncoder.frame(bytes)); + _stream.sendData(frame(bytes)); } catch (error) { final grpcError = GrpcError.internal('Error sending response: $error'); if (!_requests.isClosed) { @@ -353,9 +354,6 @@ class ServerHandler_ extends ServiceCall { } } -@Deprecated( - 'This is an internal class, and will not be part of the public interface in next major version.') -// TODO(sigurdm): Remove this class from grpc.dart exports. class ServerHandler extends ServerHandler_ { ServerHandler(Service Function(String service) serviceLookup, stream, [List interceptors = const []]) diff --git a/lib/src/shared/message.dart b/lib/src/shared/message.dart new file mode 100644 index 0000000..0d97c0a --- /dev/null +++ b/lib/src/shared/message.dart @@ -0,0 +1,78 @@ +// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:typed_data'; + +abstract class GrpcMessage {} + +class GrpcMetadata extends GrpcMessage { + final Map metadata; + GrpcMetadata(this.metadata); + + @override + String toString() => 'gRPC Metadata ($metadata)'; +} + +class GrpcData extends GrpcMessage { + final List data; + final bool isCompressed; + GrpcData(this.data, {this.isCompressed}) : assert(data != null); + + @override + String toString() => 'gRPC Data (${data.length} bytes)'; +} + +class GrpcMessageSink extends Sink { + GrpcMessage message; + + @override + void add(GrpcMessage data) { + if (message != null) { + throw 'Too many messages received!'; + } + message = data; + } + + @override + void close() { + if (message == null) { + throw 'No messages received!'; + } + } +} + +List frame(List payload) { + final payloadLength = payload.length; + final bytes = Uint8List(payloadLength + 5); + final header = bytes.buffer.asByteData(0, 5); + header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression + header.setUint32(1, payloadLength); + bytes.setRange(5, bytes.length, payload); + return bytes; +} + +StreamTransformer grpcDecompressor() => + StreamTransformer.fromHandlers( + handleData: (GrpcMessage value, EventSink sink) { + if (value is GrpcData) { + if (value.isCompressed) { + // TODO(dart-lang/grpc-dart#6): Actually handle decompression. + sink.add(GrpcData(value.data, isCompressed: false)); + return; + } + } + sink.add(value); + }); diff --git a/lib/src/shared/streams.dart b/lib/src/shared/streams.dart index 7957d7e..7a62b77 100644 --- a/lib/src/shared/streams.dart +++ b/lib/src/shared/streams.dart @@ -13,47 +13,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'dart:typed_data'; import 'package:http2/transport.dart'; +import 'message.dart'; import 'status.dart'; -abstract class GrpcMessage {} - -class GrpcMetadata extends GrpcMessage { - final Map metadata; - GrpcMetadata(this.metadata); - - @override - String toString() => 'gRPC Metadata ($metadata)'; -} - -class GrpcData extends GrpcMessage { - final List data; - final bool isCompressed; - GrpcData(this.data, {this.isCompressed}); - - @override - String toString() => 'gRPC Data (${data.length} bytes)'; -} - -StreamTransformer grpcDecompressor() => - StreamTransformer.fromHandlers( - handleData: (GrpcMessage value, EventSink sink) { - if (value is GrpcData) { - if (value.isCompressed) { - // TODO(dart-lang/grpc-dart#6): Actually handle decompression. - sink.add(GrpcData(value.data, isCompressed: false)); - return; - } - } - sink.add(value); - }); - class GrpcHttpEncoder extends Converter { @override StreamMessage convert(GrpcMessage input) { @@ -68,22 +36,12 @@ class GrpcHttpEncoder extends Converter { } throw GrpcError.internal('Unexpected message type'); } - - static List frame(List payload) { - final payloadLength = payload.length; - final bytes = Uint8List(payloadLength + 5); - final header = bytes.buffer.asByteData(0, 5); - header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression - header.setUint32(1, payloadLength); - bytes.setRange(5, bytes.length, payload); - return bytes; - } } class GrpcHttpDecoder extends Converter { @override GrpcMessage convert(StreamMessage input) { - final sink = _GrpcMessageSink(); + final sink = GrpcMessageSink(); startChunkedConversion(sink) ..add(input) ..close(); @@ -183,22 +141,3 @@ class _GrpcMessageConversionSink extends ChunkedConversionSink { _out.close(); } } - -class _GrpcMessageSink extends Sink { - GrpcMessage message; - - @override - void add(GrpcMessage data) { - if (message != null) { - throw 'Too many messages received!'; - } - message = data; - } - - @override - void close() { - if (message == null) { - throw 'No messages received!'; - } - } -} diff --git a/pubspec.yaml b/pubspec.yaml index 73c2088..0618c69 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,6 +1,8 @@ name: grpc description: Dart implementation of gRPC, a high performance, open-source universal RPC framework. -version: 1.0.3 + +version: 2.0.0 + author: Dart Team homepage: https://github.com/dart-lang/grpc-dart @@ -15,5 +17,8 @@ dependencies: http2: ^1.0.0 dev_dependencies: + build_runner: ^1.5.2 + build_test: ^0.10.8 + build_web_compilers: ^2.1.1 mockito: ^4.1.0 test: ^1.6.4 diff --git a/test/client_test.dart b/test/client_tests/client_test.dart similarity index 99% rename from test/client_test.dart rename to test/client_tests/client_test.dart index 0045e3c..f599c11 100644 --- a/test/client_test.dart +++ b/test/client_tests/client_test.dart @@ -19,8 +19,8 @@ import 'package:grpc/grpc.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; -import 'src/client_utils.dart'; -import 'src/utils.dart'; +import '../src/client_utils.dart'; +import '../src/utils.dart'; void main() { const dummyValue = 0; diff --git a/test/client_tests/client_xhr_transport_test.dart b/test/client_tests/client_xhr_transport_test.dart new file mode 100644 index 0000000..cfaba8d --- /dev/null +++ b/test/client_tests/client_xhr_transport_test.dart @@ -0,0 +1,205 @@ +// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +@TestOn('browser') + +import 'dart:async'; + +import 'dart:html'; + +import 'package:grpc/src/client/transport/xhr_transport.dart'; +import 'package:grpc/src/shared/message.dart'; +import 'package:mockito/mockito.dart'; + +import 'package:test/test.dart'; + +class MockHttpRequest extends Mock implements HttpRequest { + // ignore: close_sinks + StreamController readyStateChangeController = + StreamController(); + // ignore: close_sinks + StreamController progressController = + StreamController(); + + @override + Stream get onReadyStateChange => readyStateChangeController.stream; + + @override + Stream get onProgress => progressController.stream; + + @override + Stream get onError => StreamController().stream; + + @override + int status = 200; +} + +class MockXhrClientConnection extends XhrClientConnection { + MockXhrClientConnection() : super(Uri.parse('test:8080')); + + MockHttpRequest latestRequest; + + @override + createHttpRequest() { + final request = MockHttpRequest(); + latestRequest = request; + return request; + } +} + +void main() { + test('Make request sends correct headers', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final connection = MockXhrClientConnection(); + + connection.makeRequest('path', Duration(seconds: 10), metadata, + (error) => fail(error.toString())); + + verify(connection.latestRequest + .setRequestHeader('Content-Type', 'application/grpc-web+proto')); + verify(connection.latestRequest + .setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1')); + verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1')); + verify(connection.latestRequest + .overrideMimeType('text/plain; charset=x-user-defined')); + verify(connection.latestRequest.responseType = 'text'); + }); + + test('Sent data converted to stream properly', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final connection = MockXhrClientConnection(); + + final stream = connection.makeRequest('path', Duration(seconds: 10), + metadata, (error) => fail(error.toString())); + + final data = List.filled(10, 0); + stream.outgoingMessages.add(data); + await stream.terminate(); + + final expectedData = frame(data); + expect(verify(connection.latestRequest.send(captureAny)).captured.single, + expectedData); + }); + + test('Stream handles headers properly', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final transport = MockXhrClientConnection(); + + final stream = transport.makeRequest('test_path', Duration(seconds: 10), + metadata, (error) => fail(error.toString())); + + stream.incomingMessages.listen((message) { + expect(message, TypeMatcher()); + if (message is GrpcMetadata) { + message.metadata.forEach((key, value) { + expect(value, metadata[key]); + }); + } + }); + }); + + test('Stream deserializes data properly', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final connection = MockXhrClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + metadata, (error) => fail(error.toString())); + final data = List.filled(10, 224); + final encoded = frame(data); + final encodedString = String.fromCharCodes(encoded); + + stream.incomingMessages.listen(expectAsync1((message) { + if (message is GrpcData) { + expect(message.data, equals(data)); + } + }, count: 2)); + + when(connection.latestRequest.getResponseHeader('Content-Type')) + .thenReturn('application/grpc+proto'); + when(connection.latestRequest.responseHeaders).thenReturn(metadata); + when(connection.latestRequest.readyState) + .thenReturn(HttpRequest.HEADERS_RECEIVED); + when(connection.latestRequest.response).thenReturn(encodedString); + connection.latestRequest.readyStateChangeController.add(null); + connection.latestRequest.progressController.add(null); + }); + + test('Stream recieves multiple messages', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final connection = MockXhrClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + metadata, (error) => fail(error.toString())); + + final data = >[ + List.filled(10, 224), + List.filled(5, 124) + ]; + final encoded = data.map((d) => frame(d)); + final encodedStrings = encoded.map((e) => String.fromCharCodes(e)).toList(); + + final expectedMessages = [ + GrpcMetadata(metadata), + GrpcData(data[0]), + GrpcData(data[1]) + ]; + int i = 0; + stream.incomingMessages.listen(expectAsync1((message) { + final expectedMessage = expectedMessages[i]; + i++; + expect(message.runtimeType, expectedMessage.runtimeType); + if (message is GrpcMetadata) { + expect(message.metadata, (expectedMessage as GrpcMetadata).metadata); + } else if (message is GrpcData) { + expect(message.data, (expectedMessage as GrpcData).data); + } + }, count: expectedMessages.length)); + + when(connection.latestRequest.getResponseHeader('Content-Type')) + .thenReturn('application/grpc+proto'); + when(connection.latestRequest.responseHeaders).thenReturn(metadata); + when(connection.latestRequest.readyState) + .thenReturn(HttpRequest.HEADERS_RECEIVED); + // At first - expected response is the first message + when(connection.latestRequest.response) + .thenAnswer((_) => encodedStrings[0]); + connection.latestRequest.readyStateChangeController.add(null); + connection.latestRequest.progressController.add(null); + + // After the first call, expected response should now be both responses together + when(connection.latestRequest.response) + .thenAnswer((_) => encodedStrings[0] + encodedStrings[1]); + connection.latestRequest.progressController.add(null); + }); +} diff --git a/test/grpc_web_decoding_test.dart b/test/grpc_web_decoding_test.dart new file mode 100644 index 0000000..b5420d8 --- /dev/null +++ b/test/grpc_web_decoding_test.dart @@ -0,0 +1,17 @@ +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:grpc/src/client/transport/web_streams.dart'; +import 'package:grpc/src/shared/message.dart'; +import 'package:test/test.dart'; + +main() { + test("decoding an empty repeated", () async { + final GrpcData data = await GrpcWebDecoder() + .bind(Stream.fromIterable([ + Uint8List.fromList([0, 0, 0, 0, 0]).buffer + ])) + .first as GrpcData; + expect(data.data, []); + }); +} diff --git a/test/options_test.dart b/test/options_test.dart index 70404ab..5b94b3f 100644 --- a/test/options_test.dart +++ b/test/options_test.dart @@ -12,10 +12,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - +@TestOn('vm') import 'dart:io'; -import 'package:grpc/grpc.dart'; +import 'package:grpc/src/client/transport/http2_credentials.dart'; import 'package:test/test.dart'; const isTlsException = TypeMatcher(); diff --git a/test/server_handles_broken_connection_test.dart b/test/server_handles_broken_connection_test.dart index 09a4375..0c5fc96 100644 --- a/test/server_handles_broken_connection_test.dart +++ b/test/server_handles_broken_connection_test.dart @@ -1,3 +1,4 @@ +@TestOn('vm') import 'dart:async'; import 'dart:isolate'; import 'package:grpc/grpc.dart' as grpc; diff --git a/test/server_test.dart b/test/server_test.dart index 220b4c2..a28698c 100644 --- a/test/server_test.dart +++ b/test/server_test.dart @@ -12,6 +12,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +@TestOn('vm') import 'dart:async'; diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index 00bd499..c68dc70 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -14,7 +14,10 @@ // limitations under the License. import 'dart:async'; +import 'dart:convert'; +import 'package:grpc/src/client/http2_connection.dart'; +import 'package:grpc/src/shared/message.dart'; import 'package:grpc/src/shared/streams.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; @@ -28,7 +31,7 @@ class MockTransport extends Mock implements ClientTransportConnection {} class MockStream extends Mock implements ClientTransportStream {} -class FakeConnection extends ClientConnection { +class FakeConnection extends Http2ClientConnection { final ClientTransportConnection transport; var connectionError; @@ -53,14 +56,14 @@ class FakeChannelOptions implements ChannelOptions { } class FakeChannel extends ClientChannel { - final ClientConnection connection; + final Http2ClientConnection connection; final FakeChannelOptions options; FakeChannel(String host, this.connection, this.options) : super(host, options: options); @override - Future getConnection() async => connection; + Future getConnection() async => connection; } typedef ServerMessageHandler = void Function(StreamMessage message); @@ -141,7 +144,7 @@ class ClientHarness { } void sendResponseValue(int value) { - toClient.add(DataStreamMessage(GrpcHttpEncoder.frame(mockEncode(value)))); + toClient.add(DataStreamMessage(frame(mockEncode(value)))); } void sendResponseTrailer( @@ -183,7 +186,9 @@ class ClientHarness { final List
capturedHeaders = verify(transport.makeRequest(captureAny)).captured.single; - validateRequestHeaders(capturedHeaders, + validateRequestHeaders( + Map.fromEntries(capturedHeaders.map((header) => + MapEntry(utf8.decode(header.name), utf8.decode(header.value)))), path: expectedPath, timeout: toTimeoutString(expectedTimeout), customHeaders: expectedCustomHeaders); diff --git a/test/src/server_utils.dart b/test/src/server_utils.dart index 366cf99..c0c714e 100644 --- a/test/src/server_utils.dart +++ b/test/src/server_utils.dart @@ -15,11 +15,14 @@ import 'dart:async'; +import 'package:grpc/src/client/http2_connection.dart'; +import 'package:grpc/src/shared/message.dart'; import 'package:grpc/src/shared/streams.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; import 'package:grpc/grpc.dart'; +import 'package:grpc/src/client/transport/http2_transport.dart'; import 'utils.dart'; @@ -170,14 +173,14 @@ class ServerHarness { {String authority = 'test', Map metadata, Duration timeout}) { - final headers = ClientConnection.createCallHeaders( + final headers = Http2ClientConnection.createCallHeaders( true, authority, path, timeout, metadata, userAgent: 'dart-grpc/1.0.0 test'); toServer.add(HeadersStreamMessage(headers)); } void sendData(int value) { - toServer.add(DataStreamMessage(GrpcHttpEncoder.frame(mockEncode(value)))); + toServer.add(DataStreamMessage(frame(mockEncode(value)))); } void runTest(String path, List requests, List expectedResponses) { diff --git a/test/src/utils.dart b/test/src/utils.dart index 58f7f8f..c55d32e 100644 --- a/test/src/utils.dart +++ b/test/src/utils.dart @@ -16,6 +16,7 @@ import 'dart:convert'; import 'package:grpc/src/shared/streams.dart'; +import 'package:grpc/src/shared/message.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; @@ -29,26 +30,25 @@ Map headersToMap(List
headers) => Map.fromIterable(headers, key: (h) => ascii.decode(h.name), value: (h) => ascii.decode(h.value)); -void validateRequestHeaders(List
headers, +void validateRequestHeaders(Map headers, {String path, String authority = 'test', String timeout, Map customHeaders}) { - final headerMap = headersToMap(headers); - expect(headerMap[':method'], 'POST'); - expect(headerMap[':scheme'], 'https'); + expect(headers[':method'], 'POST'); + expect(headers[':scheme'], 'https'); if (path != null) { - expect(headerMap[':path'], path); + expect(headers[':path'], path); } - expect(headerMap[':authority'], authority); - expect(headerMap['grpc-timeout'], timeout); - expect(headerMap['content-type'], 'application/grpc'); - expect(headerMap['te'], 'trailers'); - expect(headerMap['grpc-accept-encoding'], 'identity'); - expect(headerMap['user-agent'], startsWith('dart-grpc/')); + expect(headers[':authority'], authority); + expect(headers['grpc-timeout'], timeout); + expect(headers['content-type'], 'application/grpc'); + expect(headers['te'], 'trailers'); + expect(headers['grpc-accept-encoding'], 'identity'); + expect(headers['user-agent'], startsWith('dart-grpc/')); customHeaders?.forEach((key, value) { - expect(headerMap[key], value); + expect(headers[key], value); }); } diff --git a/test/timeout_test.dart b/test/timeout_test.dart index 1719fb4..578a9d9 100644 --- a/test/timeout_test.dart +++ b/test/timeout_test.dart @@ -136,5 +136,5 @@ void main() { ..sendRequestHeader('/Test/Unary', timeout: Duration(microseconds: 1)); await harness.fromServer.done; }); - }); + }, testOn: 'vm'); }