grpc-web support (#191)

* grpc-web support

This commits merges the grpc-web branch into master.

It is based on work by https://github.com/fuzzybinary .
This commit is contained in:
Sigurd Meldgaard 2019-06-17 13:31:07 +02:00 committed by GitHub
parent e65c52070b
commit 91564ff7aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1963 additions and 514 deletions

View File

@ -1,7 +1,15 @@
language: dart
sudo: false
# Run against both the dev and channel.
# necessary to avoid chrome sandboxing issues
sudo: required
addons:
chrome: stable
# The Chrome addon does not work on windows
install:
- if [ $TRAVIS_OS_NAME = windows ]; then choco install googlechrome ; fi
# Run against both the dev and stable channel.
dart:
- stable
- dev
@ -9,6 +17,7 @@ dart:
# Define test tasks to run.
dart_task:
- test: --platform vm
- test: --platform chrome
# Only run one instance of the formatter and the analyzer, rather than running
# them against each Dart version.
@ -19,8 +28,9 @@ matrix:
- dart: dev
dart_task: dartfmt
script:
- pub get
- dartanalyzer lib test
- for example in example/*; do (cd $example; echo [Analyzing $example]; pub get; dartanalyzer .); done
- for example in example/*/; do (cd $example; echo [Analyzing $example]; pub get; dartanalyzer .); done
- (cd interop; echo [Analyzing interop]; pub get; dartanalyzer .)

View File

@ -1,3 +1,13 @@
## 2.0.0
* Add initial support for grpc-web.
See `example/grpc-web` for an example of this working.
* **Breaking**: `grpc.dart` no longer exposes `ClientConnection`. It was supposed to be an internal
abstraction.
* **Breaking**: `grpc.dart` no longer exposes the deprecated `ServerHandler`.
It was supposed to be an internal abstraction.
* `service_api.dart` no longer exports Server - it has never been used by the generated code.
## 1.0.3
* Allow custom user agent with a `userAgent` argument for `ChannelOptions()`.

5
build.yaml Normal file
View File

@ -0,0 +1,5 @@
targets:
$default:
sources:
exclude:
- example/**

View File

@ -0,0 +1,62 @@
# Description
The grpc-web example shows how to use the Dart gRPC library with a gRPC-Web capable server.
This is meant to be used with the echo example provided by the grpc-web repository. The definition of the service is given in echo.proto.
# Prerequistes
You will need a clone of the [grpc-web](https://github.com/grpc/grpc-web) repository to run the example server.
You will also need the dart 'webdev' tool, which you can get by running:
```sh
$ pub global activate webdev
```
# Run the sample code
Follow the instructions for starting the grpc-web example server. The simplest version of this involves running the grpc-web server in a docker container with:
```sh
$ docker-compose up echo-server envoy
```
To compile and run the example, assuming you are in the root of the grpc-web
folder, i.e., .../example/grpc-web/, first get the dependencies by running:
```sh
$ pub get
```
Compile and run the website with:
```sh
$ webdev serve web:9000
```
Note that the alternate port (9000) is necessary because the grpc-web server runs the grpc server on port 8080 by default (the save as webdev).
You can then navigate to http://localhost:9000/ to try out the example.
# Regenerate the stubs
If you have made changes to the message or service definition in
`protos/echo.proto` and need to regenerate the corresponding Dart files,
you will need to have protoc version 3.0.0 or higher and the Dart protoc plugin
version 16.0.0 or higher on your PATH.
To install protoc, see the instructions on
[the Protocol Buffers website](https://developers.google.com/protocol-buffers/).
The easiest way to get the Dart protoc plugin is by running
```sh
$ pub global activate protoc_plugin
```
and follow the directions to add `~/.pub-cache/bin` to your PATH, if you haven't
already done so.
You can now regenerate the Dart files by running
```sh
$ protoc --dart_out=grpc:lib/src/generated -Iprotos protos/echo.proto
```

View File

@ -0,0 +1,53 @@
import 'dart:async';
import 'dart:html';
import 'package:grpc_web/src/generated/echo.pbgrpc.dart';
class EchoApp {
final EchoServiceClient _service;
EchoApp(this._service);
Future<void> echo(String message) async {
_addLeftMessage(message);
try {
final response = await _service.echo(EchoRequest()..message = message);
_addRightMessage(response.message);
} catch (error) {
_addRightMessage(error.toString());
}
}
void repeatEcho(String message, int count) {
_addLeftMessage(message);
final request = ServerStreamingEchoRequest()
..message = message
..messageCount = count
..messageInterval = 500;
_service.serverStreamingEcho(request).listen((response) {
_addRightMessage(response.message);
}, onError: (error) {
_addRightMessage(error.toString());
}, onDone: () => print('Closed connection to server.'));
}
void _addLeftMessage(String message) {
_addMessage(message, "label-primary pull-left");
}
void _addRightMessage(String message) {
_addMessage(message, "label-default pull-right");
}
void _addMessage(String message, String cssClass) {
final classes = cssClass.split(' ');
querySelector('#first').after(DivElement()
..classes.add('row')
..append(Element.tag('h2')
..append(SpanElement()
..classes.add('label')
..classes.addAll(classes)
..text = message)));
}
}

View File

@ -0,0 +1,183 @@
///
// Generated code. Do not modify.
// source: echo.proto
///
// ignore_for_file: non_constant_identifier_names,library_prefixes,unused_import
// ignore: UNUSED_SHOWN_NAME
import 'dart:core' show int, bool, double, String, List, Map, override;
import 'package:protobuf/protobuf.dart' as $pb;
class EchoRequest extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo('EchoRequest',
package: const $pb.PackageName('grpc.gateway.testing'))
..aOS(1, 'message')
..hasRequiredFields = false;
EchoRequest() : super();
EchoRequest.fromBuffer(List<int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY])
: super.fromBuffer(i, r);
EchoRequest.fromJson(String i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY])
: super.fromJson(i, r);
EchoRequest clone() => EchoRequest()..mergeFromMessage(this);
EchoRequest copyWith(void Function(EchoRequest) updates) =>
super.copyWith((message) => updates(message as EchoRequest));
$pb.BuilderInfo get info_ => _i;
static EchoRequest create() => EchoRequest();
EchoRequest createEmptyInstance() => create();
static $pb.PbList<EchoRequest> createRepeated() => $pb.PbList<EchoRequest>();
static EchoRequest getDefault() => _defaultInstance ??= create()..freeze();
static EchoRequest _defaultInstance;
static void $checkItem(EchoRequest v) {
if (v is! EchoRequest) $pb.checkItemFailed(v, _i.qualifiedMessageName);
}
String get message => $_getS(0, '');
set message(String v) {
$_setString(0, v);
}
bool hasMessage() => $_has(0);
void clearMessage() => clearField(1);
}
class EchoResponse extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo('EchoResponse',
package: const $pb.PackageName('grpc.gateway.testing'))
..aOS(1, 'message')
..hasRequiredFields = false;
EchoResponse() : super();
EchoResponse.fromBuffer(List<int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY])
: super.fromBuffer(i, r);
EchoResponse.fromJson(String i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY])
: super.fromJson(i, r);
EchoResponse clone() => EchoResponse()..mergeFromMessage(this);
EchoResponse copyWith(void Function(EchoResponse) updates) =>
super.copyWith((message) => updates(message as EchoResponse));
$pb.BuilderInfo get info_ => _i;
static EchoResponse create() => EchoResponse();
EchoResponse createEmptyInstance() => create();
static $pb.PbList<EchoResponse> createRepeated() =>
$pb.PbList<EchoResponse>();
static EchoResponse getDefault() => _defaultInstance ??= create()..freeze();
static EchoResponse _defaultInstance;
static void $checkItem(EchoResponse v) {
if (v is! EchoResponse) $pb.checkItemFailed(v, _i.qualifiedMessageName);
}
String get message => $_getS(0, '');
set message(String v) {
$_setString(0, v);
}
bool hasMessage() => $_has(0);
void clearMessage() => clearField(1);
}
class ServerStreamingEchoRequest extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(
'ServerStreamingEchoRequest',
package: const $pb.PackageName('grpc.gateway.testing'))
..aOS(1, 'message')
..a<int>(2, 'messageCount', $pb.PbFieldType.O3)
..a<int>(3, 'messageInterval', $pb.PbFieldType.O3)
..hasRequiredFields = false;
ServerStreamingEchoRequest() : super();
ServerStreamingEchoRequest.fromBuffer(List<int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY])
: super.fromBuffer(i, r);
ServerStreamingEchoRequest.fromJson(String i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY])
: super.fromJson(i, r);
ServerStreamingEchoRequest clone() =>
ServerStreamingEchoRequest()..mergeFromMessage(this);
ServerStreamingEchoRequest copyWith(
void Function(ServerStreamingEchoRequest) updates) =>
super.copyWith(
(message) => updates(message as ServerStreamingEchoRequest));
$pb.BuilderInfo get info_ => _i;
static ServerStreamingEchoRequest create() => ServerStreamingEchoRequest();
ServerStreamingEchoRequest createEmptyInstance() => create();
static $pb.PbList<ServerStreamingEchoRequest> createRepeated() =>
$pb.PbList<ServerStreamingEchoRequest>();
static ServerStreamingEchoRequest getDefault() =>
_defaultInstance ??= create()..freeze();
static ServerStreamingEchoRequest _defaultInstance;
static void $checkItem(ServerStreamingEchoRequest v) {
if (v is! ServerStreamingEchoRequest)
$pb.checkItemFailed(v, _i.qualifiedMessageName);
}
String get message => $_getS(0, '');
set message(String v) {
$_setString(0, v);
}
bool hasMessage() => $_has(0);
void clearMessage() => clearField(1);
int get messageCount => $_get(1, 0);
set messageCount(int v) {
$_setSignedInt32(1, v);
}
bool hasMessageCount() => $_has(1);
void clearMessageCount() => clearField(2);
int get messageInterval => $_get(2, 0);
set messageInterval(int v) {
$_setSignedInt32(2, v);
}
bool hasMessageInterval() => $_has(2);
void clearMessageInterval() => clearField(3);
}
class ServerStreamingEchoResponse extends $pb.GeneratedMessage {
static final $pb.BuilderInfo _i = $pb.BuilderInfo(
'ServerStreamingEchoResponse',
package: const $pb.PackageName('grpc.gateway.testing'))
..aOS(1, 'message')
..hasRequiredFields = false;
ServerStreamingEchoResponse() : super();
ServerStreamingEchoResponse.fromBuffer(List<int> i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY])
: super.fromBuffer(i, r);
ServerStreamingEchoResponse.fromJson(String i,
[$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY])
: super.fromJson(i, r);
ServerStreamingEchoResponse clone() =>
ServerStreamingEchoResponse()..mergeFromMessage(this);
ServerStreamingEchoResponse copyWith(
void Function(ServerStreamingEchoResponse) updates) =>
super.copyWith(
(message) => updates(message as ServerStreamingEchoResponse));
$pb.BuilderInfo get info_ => _i;
static ServerStreamingEchoResponse create() => ServerStreamingEchoResponse();
ServerStreamingEchoResponse createEmptyInstance() => create();
static $pb.PbList<ServerStreamingEchoResponse> createRepeated() =>
$pb.PbList<ServerStreamingEchoResponse>();
static ServerStreamingEchoResponse getDefault() =>
_defaultInstance ??= create()..freeze();
static ServerStreamingEchoResponse _defaultInstance;
static void $checkItem(ServerStreamingEchoResponse v) {
if (v is! ServerStreamingEchoResponse)
$pb.checkItemFailed(v, _i.qualifiedMessageName);
}
String get message => $_getS(0, '');
set message(String v) {
$_setString(0, v);
}
bool hasMessage() => $_has(0);
void clearMessage() => clearField(1);
}

View File

@ -0,0 +1,5 @@
///
// Generated code. Do not modify.
// source: echo.proto
///
// ignore_for_file: non_constant_identifier_names,library_prefixes,unused_import

View File

@ -0,0 +1,79 @@
///
// Generated code. Do not modify.
// source: echo.proto
///
// ignore_for_file: non_constant_identifier_names,library_prefixes,unused_import
import 'dart:async' as $async;
import 'package:grpc/service_api.dart' as $grpc;
import 'echo.pb.dart';
export 'echo.pb.dart';
class EchoServiceClient extends $grpc.Client {
static final _$echo = $grpc.ClientMethod<EchoRequest, EchoResponse>(
'/grpc.gateway.testing.EchoService/Echo',
(EchoRequest value) => value.writeToBuffer(),
(List<int> value) => EchoResponse.fromBuffer(value));
static final _$serverStreamingEcho = $grpc.ClientMethod<
ServerStreamingEchoRequest, ServerStreamingEchoResponse>(
'/grpc.gateway.testing.EchoService/ServerStreamingEcho',
(ServerStreamingEchoRequest value) => value.writeToBuffer(),
(List<int> value) => ServerStreamingEchoResponse.fromBuffer(value));
EchoServiceClient($grpc.ClientChannel channel, {$grpc.CallOptions options})
: super(channel, options: options);
$grpc.ResponseFuture<EchoResponse> echo(EchoRequest request,
{$grpc.CallOptions options}) {
final call = $createCall(_$echo, $async.Stream.fromIterable([request]),
options: options);
return $grpc.ResponseFuture(call);
}
$grpc.ResponseStream<ServerStreamingEchoResponse> serverStreamingEcho(
ServerStreamingEchoRequest request,
{$grpc.CallOptions options}) {
final call = $createCall(
_$serverStreamingEcho, $async.Stream.fromIterable([request]),
options: options);
return $grpc.ResponseStream(call);
}
}
abstract class EchoServiceBase extends $grpc.Service {
String get $name => 'grpc.gateway.testing.EchoService';
EchoServiceBase() {
$addMethod($grpc.ServiceMethod<EchoRequest, EchoResponse>(
'Echo',
echo_Pre,
false,
false,
(List<int> value) => EchoRequest.fromBuffer(value),
(EchoResponse value) => value.writeToBuffer()));
$addMethod($grpc.ServiceMethod<ServerStreamingEchoRequest,
ServerStreamingEchoResponse>(
'ServerStreamingEcho',
serverStreamingEcho_Pre,
false,
true,
(List<int> value) => ServerStreamingEchoRequest.fromBuffer(value),
(ServerStreamingEchoResponse value) => value.writeToBuffer()));
}
$async.Future<EchoResponse> echo_Pre(
$grpc.ServiceCall call, $async.Future request) async {
return echo(call, await request);
}
$async.Stream<ServerStreamingEchoResponse> serverStreamingEcho_Pre(
$grpc.ServiceCall call, $async.Future request) async* {
yield* serverStreamingEcho(
call, (await request) as ServerStreamingEchoRequest);
}
$async.Future<EchoResponse> echo($grpc.ServiceCall call, EchoRequest request);
$async.Stream<ServerStreamingEchoResponse> serverStreamingEcho(
$grpc.ServiceCall call, ServerStreamingEchoRequest request);
}

View File

@ -0,0 +1,35 @@
///
// Generated code. Do not modify.
// source: echo.proto
///
// ignore_for_file: non_constant_identifier_names,library_prefixes,unused_import
const EchoRequest$json = {
'1': 'EchoRequest',
'2': [
{'1': 'message', '3': 1, '4': 1, '5': 9, '10': 'message'},
],
};
const EchoResponse$json = {
'1': 'EchoResponse',
'2': [
{'1': 'message', '3': 1, '4': 1, '5': 9, '10': 'message'},
],
};
const ServerStreamingEchoRequest$json = {
'1': 'ServerStreamingEchoRequest',
'2': [
{'1': 'message', '3': 1, '4': 1, '5': 9, '10': 'message'},
{'1': 'message_count', '3': 2, '4': 1, '5': 5, '10': 'messageCount'},
{'1': 'message_interval', '3': 3, '4': 1, '5': 5, '10': 'messageInterval'},
],
};
const ServerStreamingEchoResponse$json = {
'1': 'ServerStreamingEchoResponse',
'2': [
{'1': 'message', '3': 1, '4': 1, '5': 9, '10': 'message'},
],
};

View File

@ -0,0 +1,41 @@
// Copyright 2018 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package grpc.gateway.testing;
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
message ServerStreamingEchoRequest {
string message = 1;
int32 message_count = 2;
int32 message_interval = 3;
}
message ServerStreamingEchoResponse {
string message = 1;
}
service EchoService {
rpc Echo(EchoRequest) returns (EchoResponse);
rpc ServerStreamingEcho(ServerStreamingEchoRequest)
returns (stream ServerStreamingEchoResponse);
}

View File

@ -0,0 +1,15 @@
name: grpc_web
description: Dart gRPC-Web sample client
homepage: https://github.com/dart-lang/grpc-dart
environment:
sdk: '>=2.0.0 <3.0.0'
dependencies:
grpc:
path: ../../
protobuf: ^0.13.4
dev_dependencies:
build_runner: ^0.10.0
build_web_compilers: '^0.4.1'

View File

@ -0,0 +1,42 @@
<!--
Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
for details. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Echo Example</title>
<link rel="stylesheet" href="//maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css">
<script defer src="main.dart.js"></script>
</head>
<body>
<div class="container">
<div class="row" id="first">
<div class="form-group">
<div class="input-group">
<input type="text" class="form-control" id="msg">
<span class="input-group-btn">
<button class="btn btn-primary" type="button" id="send">Send
</button>
</span>
</div>
<p class="help-block">Example: "Hello", "4 Hello"</p>
</div>
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,47 @@
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:html';
import 'package:grpc/grpc_web.dart';
import 'package:grpc_web/app.dart';
import 'package:grpc_web/src/generated/echo.pbgrpc.dart';
void main() {
final channel = GrpcWebClientChannel.xhr(Uri.parse('http://localhost:8080'));
final service = EchoServiceClient(channel);
final app = EchoApp(service);
final button = querySelector('#send') as ButtonElement;
button.onClick.listen((e) async {
final msg = querySelector('#msg') as TextInputElement;
final value = msg.value.trim();
msg.value = '';
if (value.isEmpty) return false;
if (value.indexOf(' ') > 0) {
final countStr = value.substring(0, value.indexOf(' '));
final count = int.tryParse(countStr);
if (count != null) {
app.repeatEcho(value.substring(value.indexOf(' ') + 1), count);
} else {
app.echo(value);
}
} else {
app.echo(value);
}
});
}

View File

@ -13,23 +13,41 @@
// See the License for the specific language governing permissions and
// limitations under the License.
export 'src/auth/auth.dart';
export 'src/auth/auth.dart'
show
BaseAuthenticator,
HttpBasedAuthenticator,
ComputeEngineAuthenticator,
ServiceAccountAuthenticator,
JwtServiceAccountAuthenticator;
export 'src/client/call.dart';
export 'src/client/channel.dart';
export 'src/client/client.dart';
export 'src/client/common.dart';
export 'src/client/connection.dart';
export 'src/client/method.dart';
export 'src/client/options.dart';
export 'src/client/call.dart' show CallOptions, ClientCall, MetadataProvider;
export 'src/client/client.dart' show Client;
export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
export 'src/client/connection.dart' show ConnectionState;
export 'src/client/http2_channel.dart' show ClientChannel;
export 'src/client/method.dart' show ClientMethod;
export 'src/client/options.dart'
show
defaultIdleTimeout,
BackoffStrategy,
defaultBackoffStrategy,
ChannelOptions;
export 'src/server/call.dart';
export 'src/server/handler.dart' show ServerHandler;
export 'src/server/interceptor.dart';
export 'src/server/server.dart';
export 'src/server/service.dart';
export 'src/client/transport/http2_credentials.dart'
show BadCertificateHandler, allowBadCertificates, ChannelCredentials;
export 'src/shared/security.dart';
export 'src/shared/status.dart';
export 'src/shared/streams.dart';
export 'src/shared/timeout.dart';
export 'src/server/call.dart' show ServiceCall;
export 'src/server/interceptor.dart' show Interceptor;
export 'src/server/server.dart' show ServerTlsCredentials, Server;
export 'src/server/service.dart' show ServiceMethod, Service;
export 'src/shared/message.dart'
show GrpcMessage, GrpcMetadata, GrpcData, grpcDecompressor;
export 'src/shared/security.dart'
show supportedAlpnProtocols, createSecurityContext;
export 'src/shared/status.dart' show StatusCode, GrpcError;
export 'src/shared/streams.dart' show GrpcHttpEncoder, GrpcHttpDecoder;
export 'src/shared/timeout.dart' show toTimeoutString, fromTimeoutString;

25
lib/grpc_web.dart Normal file
View File

@ -0,0 +1,25 @@
// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
export 'src/auth/auth.dart'
show HttpBasedAuthenticator, JwtServiceAccountAuthenticator;
export 'src/client/call.dart' show MetadataProvider, CallOptions;
export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
export 'src/client/web_channel.dart' show GrpcWebClientChannel;
export 'src/shared/status.dart' show StatusCode, GrpcError;

View File

@ -18,11 +18,10 @@
/// Mainly intended to be imported by generated code.
library service_api;
export 'src/client/call.dart' show CallOptions;
export 'src/client/channel.dart' show ClientChannel;
export 'src/client/client.dart' show Client;
export 'src/client/common.dart' show ResponseFuture, ResponseStream;
export 'src/client/method.dart' show ClientMethod;
export 'src/client/options.dart' show CallOptions;
export 'src/server/call.dart' show ServiceCall;
export 'src/server/server.dart' show Server;
export 'src/server/service.dart' show Service, ServiceMethod;

View File

@ -21,7 +21,7 @@ import 'package:googleapis_auth/src/crypto/rsa_sign.dart';
import 'package:grpc/src/shared/status.dart';
import 'package:http/http.dart' as http;
import '../client/options.dart';
import '../client/call.dart';
const _tokenExpirationThreshold = Duration(seconds: 30);

View File

@ -1,4 +1,4 @@
// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -15,15 +15,14 @@
import 'dart:async';
import 'package:http2/transport.dart';
import '../shared/message.dart';
import '../shared/status.dart';
import '../shared/streams.dart';
import 'common.dart';
import 'connection.dart';
import 'method.dart';
import 'options.dart';
import 'transport/transport.dart';
const _reservedHeaders = [
'content-type',
@ -33,6 +32,55 @@ const _reservedHeaders = [
'user-agent',
];
/// Provides per-RPC metadata.
///
/// Metadata providers will be invoked for every RPC, and can add their own
/// metadata to the RPC. If the function returns a [Future], the RPC will await
/// completion of the returned [Future] before transmitting the request.
///
/// The metadata provider is given the current [metadata] map (possibly modified
/// by previous metadata providers) and the [uri] that is being called, and is
/// expected to modify the map before returning or before completing the
/// returned [Future].
typedef FutureOr<void> MetadataProvider(
Map<String, String> metadata, String uri);
/// Runtime options for an RPC.
class CallOptions {
final Map<String, String> metadata;
final Duration timeout;
final List<MetadataProvider> metadataProviders;
CallOptions._(this.metadata, this.timeout, this.metadataProviders);
/// Creates a [CallOptions] object.
///
/// [CallOptions] can specify static [metadata], set the [timeout], and
/// configure per-RPC metadata [providers]. The metadata [providers] are
/// invoked in order for every RPC, and can modify the outgoing metadata
/// (including metadata provided by previous providers).
factory CallOptions(
{Map<String, String> metadata,
Duration timeout,
List<MetadataProvider> providers}) {
return CallOptions._(Map.unmodifiable(metadata ?? {}), timeout,
List.unmodifiable(providers ?? []));
}
factory CallOptions.from(Iterable<CallOptions> options) =>
options.fold(CallOptions(), (p, o) => p.mergedWith(o));
CallOptions mergedWith(CallOptions other) {
if (other == null) return this;
final mergedMetadata = Map.from(metadata)..addAll(other.metadata);
final mergedTimeout = other.timeout ?? timeout;
final mergedProviders = List.from(metadataProviders)
..addAll(other.metadataProviders);
return CallOptions._(Map.unmodifiable(mergedMetadata), mergedTimeout,
List.unmodifiable(mergedProviders));
}
}
/// An active call to a gRPC endpoint.
class ClientCall<Q, R> implements Response {
final ClientMethod<Q, R> _method;
@ -45,9 +93,9 @@ class ClientCall<Q, R> implements Response {
Map<String, String> _headerMetadata;
TransportStream _stream;
GrpcTransportStream _stream;
StreamController<R> _responses;
StreamSubscription<StreamMessage> _requestSubscription;
StreamSubscription<List<int>> _requestSubscription;
StreamSubscription<GrpcMessage> _responseSubscription;
bool isCancelled = false;
@ -60,8 +108,6 @@ class ClientCall<Q, R> implements Response {
}
}
String get path => _method.path;
void onConnectionError(error) {
_terminateWithError(GrpcError.unavailable('Error connecting: $error'));
}
@ -85,6 +131,16 @@ class ClientCall<Q, R> implements Response {
return sanitizedMetadata;
}
// TODO(sigurdm): Find out why we do this.
static String audiencePath(ClientMethod method) {
final lastSlashPos = method.path.lastIndexOf('/');
return lastSlashPos == -1
? method.path
: method.path.substring(0, lastSlashPos);
}
void onConnectionReady(ClientConnection connection) {
if (isCancelled) return;
@ -92,16 +148,10 @@ class ClientCall<Q, R> implements Response {
_sendRequest(connection, _sanitizeMetadata(options.metadata));
} else {
final metadata = Map<String, String>.from(options.metadata);
String audience;
if (connection.options.credentials.isSecure) {
final port = connection.port != 443 ? ':${connection.port}' : '';
final lastSlashPos = path.lastIndexOf('/');
final audiencePath =
lastSlashPos == -1 ? path : path.substring(0, lastSlashPos);
audience = 'https://${connection.authority}$port$audiencePath';
}
Future.forEach(options.metadataProviders,
(provider) => provider(metadata, audience))
Future.forEach(
options.metadataProviders,
(provider) => provider(
metadata, "${connection.authority}${audiencePath(_method)}"))
.then((_) => _sendRequest(connection, _sanitizeMetadata(metadata)))
.catchError(_onMetadataProviderError);
}
@ -113,15 +163,14 @@ class ClientCall<Q, R> implements Response {
void _sendRequest(ClientConnection connection, Map<String, String> metadata) {
try {
_stream = connection.makeRequest(path, options.timeout, metadata);
_stream = connection.makeRequest(
_method.path, options.timeout, metadata, _onRequestError);
} catch (e) {
_terminateWithError(GrpcError.unavailable('Error making call: $e'));
return;
}
_requestSubscription = _requests
.map(_method.requestSerializer)
.map(GrpcHttpEncoder.frame)
.map<StreamMessage>((bytes) => DataStreamMessage(bytes))
.handleError(_onRequestError)
.listen(_stream.outgoingMessages.add,
onError: _stream.outgoingMessages.addError,
@ -143,13 +192,10 @@ class ClientCall<Q, R> implements Response {
if (_stream != null &&
_responses.hasListener &&
_responseSubscription == null) {
_responseSubscription = _stream.incomingMessages
.transform(GrpcHttpDecoder())
.transform(grpcDecompressor())
.listen(_onResponseData,
onError: _onResponseError,
onDone: _onResponseDone,
cancelOnError: true);
_responseSubscription = _stream.incomingMessages.listen(_onResponseData,
onError: _onResponseError,
onDone: _onResponseDone,
cancelOnError: true);
if (_responses.isPaused) {
_responseSubscription.pause();
}
@ -252,7 +298,7 @@ class ClientCall<Q, R> implements Response {
/// Error handler for the requests stream. Something went wrong while trying
/// to send the request to the server. Abort the request, and forward the
/// error to the user code on the [_responses] stream.
void _onRequestError(error) {
void _onRequestError(error, [StackTrace stackTrace]) {
if (error is! GrpcError) {
error = GrpcError.unknown(error.toString());
}
@ -299,7 +345,9 @@ class ClientCall<Q, R> implements Response {
await Future.wait(futures);
}
Future<void> _safeTerminate() {
return _terminate().catchError((_) {});
Future<void> _safeTerminate() async {
try {
await _terminate();
} catch (_) {}
}
}

View File

@ -20,54 +20,59 @@ import '../shared/status.dart';
import 'call.dart';
import 'connection.dart';
import 'method.dart';
import 'options.dart';
/// A channel to a virtual RPC endpoint.
///
/// For each RPC, the channel picks a [ClientConnection] to dispatch the call.
/// RPCs on the same channel may be sent to different connections, depending on
/// load balancing settings.
class ClientChannel {
final String host;
final int port;
final ChannelOptions options;
abstract class ClientChannel {
/// Shuts down this channel.
///
/// No further RPCs can be made on this channel. RPCs already in progress will
/// be allowed to complete.
Future<void> shutdown();
/// Terminates this channel.
///
/// RPCs already in progress will be terminated. No further RPCs can be made
/// on this channel.
Future<void> terminate();
/// Initiates a new RPC on this connection.
ClientCall<Q, R> createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options);
}
/// Auxiliary base class implementing much of ClientChannel.
abstract class ClientChannelBase implements ClientChannel {
// TODO(jakobr): Multiple connections, load balancing.
ClientConnection _connection;
bool _isShutdown = false;
ClientChannel(this.host,
{this.port = 443, this.options = const ChannelOptions()});
ClientChannelBase();
/// Shuts down this channel.
///
/// No further RPCs can be made on this channel. RPCs already in progress will
/// be allowed to complete.
@override
Future<void> shutdown() async {
if (_isShutdown) return;
_isShutdown = true;
if (_connection != null) await _connection.shutdown();
}
/// Terminates this channel.
///
/// RPCs already in progress will be terminated. No further RPCs can be made
/// on this channel.
@override
Future<void> terminate() async {
_isShutdown = true;
if (_connection != null) await _connection.terminate();
}
ClientConnection createConnection();
/// Returns a connection to this [Channel]'s RPC endpoint.
///
/// The connection may be shared between multiple RPCs.
Future<ClientConnection> getConnection() async {
if (_isShutdown) throw GrpcError.unavailable('Channel shutting down.');
return _connection ??= ClientConnection(host, port, options);
return _connection ??= createConnection();
}
/// Initiates a new RPC on this connection.
@override
ClientCall<Q, R> createCall<Q, R>(
ClientMethod<Q, R> method, Stream<Q> requests, CallOptions options) {
final call = ClientCall(method, requests, options);

View File

@ -18,7 +18,6 @@ import 'dart:async';
import 'call.dart';
import 'channel.dart';
import 'method.dart';
import 'options.dart';
/// Base class for client stubs.
class Client {

View File

@ -14,16 +14,10 @@
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:http2/transport.dart';
import 'package:meta/meta.dart';
import '../shared/timeout.dart';
import 'call.dart';
import 'options.dart';
import 'transport/transport.dart';
enum ConnectionState {
/// Actively trying to connect.
@ -42,245 +36,25 @@ enum ConnectionState {
shutdown
}
/// A connection to a single RPC endpoint.
///
/// RPCs made on a connection are always sent to the same endpoint.
class ClientConnection {
static final _methodPost = Header.ascii(':method', 'POST');
static final _schemeHttp = Header.ascii(':scheme', 'http');
static final _schemeHttps = Header.ascii(':scheme', 'https');
static final _contentTypeGrpc =
Header.ascii('content-type', 'application/grpc');
static final _teTrailers = Header.ascii('te', 'trailers');
static final _grpcAcceptEncoding =
Header.ascii('grpc-accept-encoding', 'identity');
abstract class ClientConnection {
String get authority;
final String host;
final int port;
final ChannelOptions options;
/// Put [call] on the queue to be dispatched when the connection is ready.
void dispatchCall(ClientCall call);
ConnectionState _state = ConnectionState.idle;
void Function(ClientConnection connection) onStateChanged;
final _pendingCalls = <ClientCall>[];
ClientTransportConnection _transport;
/// Used for idle and reconnect timeout, depending on [_state].
Timer _timer;
Duration _currentReconnectDelay;
ClientConnection(this.host, this.port, this.options);
ConnectionState get state => _state;
static List<Header> createCallHeaders(bool useTls, String authority,
String path, Duration timeout, Map<String, String> metadata,
{String userAgent}) {
final headers = [
_methodPost,
useTls ? _schemeHttps : _schemeHttp,
Header(ascii.encode(':path'), utf8.encode(path)),
Header(ascii.encode(':authority'), utf8.encode(authority)),
];
if (timeout != null) {
headers.add(Header.ascii('grpc-timeout', toTimeoutString(timeout)));
}
headers.addAll([
_contentTypeGrpc,
_teTrailers,
_grpcAcceptEncoding,
Header.ascii('user-agent', userAgent ?? defaultUserAgent),
]);
metadata?.forEach((key, value) {
headers.add(Header(ascii.encode(key), utf8.encode(value)));
});
return headers;
}
String get authority => options.credentials.authority ?? host;
@visibleForTesting
Future<ClientTransportConnection> connectTransport() async {
final securityContext = options.credentials.securityContext;
var socket = await Socket.connect(host, port);
if (_state == ConnectionState.shutdown) {
socket.destroy();
throw 'Shutting down';
}
if (securityContext != null) {
socket = await SecureSocket.secure(socket,
host: authority,
context: securityContext,
onBadCertificate: _validateBadCertificate);
if (_state == ConnectionState.shutdown) {
socket.destroy();
throw 'Shutting down';
}
}
socket.done.then(_handleSocketClosed);
return ClientTransportConnection.viaSocket(socket);
}
bool _validateBadCertificate(X509Certificate certificate) {
final validator = options.credentials.onBadCertificate;
if (validator == null) return false;
return validator(certificate, authority);
}
void _connect() {
if (_state != ConnectionState.idle &&
_state != ConnectionState.transientFailure) {
return;
}
_setState(ConnectionState.connecting);
connectTransport().then((transport) {
_currentReconnectDelay = null;
_transport = transport;
_transport.onActiveStateChanged = _handleActiveStateChanged;
_setState(ConnectionState.ready);
_pendingCalls.forEach(_startCall);
_pendingCalls.clear();
}).catchError(_handleConnectionFailure);
}
void dispatchCall(ClientCall call) {
switch (_state) {
case ConnectionState.ready:
_startCall(call);
break;
case ConnectionState.shutdown:
_shutdownCall(call);
break;
default:
_pendingCalls.add(call);
if (_state == ConnectionState.idle) {
_connect();
}
}
}
ClientTransportStream makeRequest(
String path, Duration timeout, Map<String, String> metadata) {
final headers = createCallHeaders(
options.credentials.isSecure, authority, path, timeout, metadata,
userAgent: options.userAgent);
return _transport.makeRequest(headers);
}
void _startCall(ClientCall call) {
if (call.isCancelled) return;
call.onConnectionReady(this);
}
void _failCall(ClientCall call, dynamic error) {
if (call.isCancelled) return;
call.onConnectionError(error);
}
void _shutdownCall(ClientCall call) {
_failCall(call, 'Connection shutting down.');
}
/// Start a request for [path] with [metadata].
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onRequestFailure);
/// Shuts down this connection.
///
/// No further calls may be made on this connection, but existing calls
/// are allowed to finish.
Future<void> shutdown() async {
if (_state == ConnectionState.shutdown) return null;
_setShutdownState();
await _transport?.finish();
}
Future<void> shutdown();
/// Terminates this connection.
///
/// All open calls are terminated immediately, and no further calls may be
/// made on this connection.
Future<void> terminate() async {
_setShutdownState();
await _transport?.terminate();
}
void _setShutdownState() {
_setState(ConnectionState.shutdown);
_cancelTimer();
_pendingCalls.forEach(_shutdownCall);
_pendingCalls.clear();
}
void _setState(ConnectionState state) {
_state = state;
if (onStateChanged != null) {
onStateChanged(this);
}
}
void _handleIdleTimeout() {
if (_timer == null || _state != ConnectionState.ready) return;
_cancelTimer();
_transport?.finish()?.catchError((_) => {}); // TODO(jakobr): Log error.
_transport = null;
_setState(ConnectionState.idle);
}
void _cancelTimer() {
_timer?.cancel();
_timer = null;
}
void _handleActiveStateChanged(bool isActive) {
if (isActive) {
_cancelTimer();
} else {
if (options.idleTimeout != null) {
_timer ??= Timer(options.idleTimeout, _handleIdleTimeout);
}
}
}
bool _hasPendingCalls() {
// Get rid of pending calls that have timed out.
_pendingCalls.removeWhere((call) => call.isCancelled);
return _pendingCalls.isNotEmpty;
}
void _handleConnectionFailure(error) {
_transport = null;
if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) {
return;
}
// TODO(jakobr): Log error.
_cancelTimer();
_pendingCalls.forEach((call) => _failCall(call, error));
_pendingCalls.clear();
_setState(ConnectionState.idle);
}
void _handleReconnect() {
if (_timer == null || _state != ConnectionState.transientFailure) return;
_cancelTimer();
_connect();
}
void _handleSocketClosed(_) {
_cancelTimer();
_transport = null;
if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) {
// All good.
return;
}
// We were not planning to close the socket.
if (!_hasPendingCalls()) {
// No pending calls. Just hop to idle, and wait for a new RPC.
_setState(ConnectionState.idle);
return;
}
// We have pending RPCs. Reconnect after backoff delay.
_setState(ConnectionState.transientFailure);
_currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay);
_timer = Timer(_currentReconnectDelay, _handleReconnect);
}
Future<void> terminate();
}

View File

@ -0,0 +1,40 @@
// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'channel.dart';
import 'connection.dart';
import 'http2_connection.dart' show Http2ClientConnection;
import 'options.dart';
import 'transport/http2_credentials.dart';
/// A channel to a virtual gRPC endpoint.
///
/// For each RPC, the channel picks a [Http2ClientConnection] to dispatch the call.
/// RPCs on the same channel may be sent to different connections, depending on
/// load balancing settings.
class ClientChannel extends ClientChannelBase {
final String host;
final int port;
final ChannelOptions options;
ClientChannel(this.host,
{this.port = 443, this.options = const ChannelOptions()})
: super();
@override
ClientConnection createConnection() {
return Http2ClientConnection(host, port, options);
}
}

View File

@ -0,0 +1,275 @@
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:http2/transport.dart';
import 'package:meta/meta.dart';
import '../shared/timeout.dart';
import 'call.dart';
import 'connection.dart' hide ClientConnection;
import 'connection.dart' as connection;
import 'options.dart';
import 'transport/http2_credentials.dart';
import 'transport/http2_transport.dart';
import 'transport/transport.dart';
class Http2ClientConnection implements connection.ClientConnection {
static final _methodPost = Header.ascii(':method', 'POST');
static final _schemeHttp = Header.ascii(':scheme', 'http');
static final _schemeHttps = Header.ascii(':scheme', 'https');
static final _contentTypeGrpc =
Header.ascii('content-type', 'application/grpc');
static final _teTrailers = Header.ascii('te', 'trailers');
static final _grpcAcceptEncoding =
Header.ascii('grpc-accept-encoding', 'identity');
final ChannelOptions options;
connection.ConnectionState _state = ConnectionState.idle;
@visibleForTesting
void Function(Http2ClientConnection connection) onStateChanged;
final _pendingCalls = <ClientCall>[];
ClientTransportConnection _transportConnection;
/// Used for idle and reconnect timeout, depending on [_state].
Timer _timer;
Duration _currentReconnectDelay;
final String host;
final int port;
Http2ClientConnection(this.host, this.port, this.options);
ChannelCredentials get credentials => options.credentials;
String get authority =>
options.credentials.authority ?? port == 443 ? host : "$host:$port";
ConnectionState get state => _state;
Future<ClientTransportConnection> connectTransport() async {
var socket = await Socket.connect(host, port);
if (_state == ConnectionState.shutdown) {
socket.destroy();
// TODO(sigurdm): Throw something nicer...
throw 'Shutting down';
}
final securityContext = credentials.securityContext;
if (securityContext != null) {
socket = await SecureSocket.secure(socket,
host: authority,
context: securityContext,
onBadCertificate: _validateBadCertificate);
if (_state == ConnectionState.shutdown) {
socket.destroy();
// TODO(sigurdm): Throw something nicer...
throw 'Shutting down';
}
}
socket.done.then((_) => _handleSocketClosed());
return ClientTransportConnection.viaSocket(socket);
}
void _connect() {
if (_state != ConnectionState.idle &&
_state != ConnectionState.transientFailure) {
return;
}
_setState(ConnectionState.connecting);
connectTransport().then((transport) {
_currentReconnectDelay = null;
_transportConnection = transport;
transport.onActiveStateChanged = _handleActiveStateChanged;
_setState(ConnectionState.ready);
_pendingCalls.forEach(_startCall);
_pendingCalls.clear();
}).catchError(_handleConnectionFailure);
}
void dispatchCall(ClientCall call) {
switch (_state) {
case ConnectionState.ready:
_startCall(call);
break;
case ConnectionState.shutdown:
_shutdownCall(call);
break;
default:
_pendingCalls.add(call);
if (_state == ConnectionState.idle) {
_connect();
}
}
}
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onRequestFailure) {
final headers = createCallHeaders(
credentials.isSecure, authority, path, timeout, metadata,
userAgent: options.userAgent);
final stream = _transportConnection.makeRequest(headers);
return Http2TransportStream(stream, onRequestFailure);
}
void _startCall(ClientCall call) {
if (call.isCancelled) return;
call.onConnectionReady(this);
}
void _failCall(ClientCall call, dynamic error) {
if (call.isCancelled) return;
call.onConnectionError(error);
}
void _shutdownCall(ClientCall call) {
_failCall(call, 'Connection shutting down.');
}
Future<void> shutdown() async {
if (_state == ConnectionState.shutdown) return null;
_setShutdownState();
await _transportConnection?.finish();
}
Future<void> terminate() async {
_setShutdownState();
await _transportConnection?.terminate();
}
void _setShutdownState() {
_setState(ConnectionState.shutdown);
_cancelTimer();
_pendingCalls.forEach(_shutdownCall);
_pendingCalls.clear();
}
void _setState(ConnectionState state) {
_state = state;
if (onStateChanged != null) {
onStateChanged(this);
}
}
void _handleIdleTimeout() {
if (_timer == null || _state != ConnectionState.ready) return;
_cancelTimer();
_transportConnection
?.finish()
?.catchError((_) => {}); // TODO(jakobr): Log error.
_transportConnection = null;
_setState(ConnectionState.idle);
}
void _cancelTimer() {
_timer?.cancel();
_timer = null;
}
void _handleActiveStateChanged(bool isActive) {
if (isActive) {
_cancelTimer();
} else {
if (options.idleTimeout != null) {
_timer ??= Timer(options.idleTimeout, _handleIdleTimeout);
}
}
}
bool _hasPendingCalls() {
// Get rid of pending calls that have timed out.
_pendingCalls.removeWhere((call) => call.isCancelled);
return _pendingCalls.isNotEmpty;
}
void _handleConnectionFailure(error) {
_transportConnection = null;
if (_state == ConnectionState.shutdown || _state == ConnectionState.idle) {
return;
}
// TODO(jakobr): Log error.
_cancelTimer();
_pendingCalls.forEach((call) => _failCall(call, error));
_pendingCalls.clear();
_setState(ConnectionState.idle);
}
void _handleReconnect() {
if (_timer == null || _state != ConnectionState.transientFailure) return;
_cancelTimer();
_connect();
}
void _handleSocketClosed() {
_cancelTimer();
_transportConnection = null;
if (_state == ConnectionState.idle && _state == ConnectionState.shutdown) {
// All good.
return;
}
// We were not planning to close the socket.
if (!_hasPendingCalls()) {
// No pending calls. Just hop to idle, and wait for a new RPC.
_setState(ConnectionState.idle);
return;
}
// We have pending RPCs. Reconnect after backoff delay.
_setState(ConnectionState.transientFailure);
_currentReconnectDelay = options.backoffStrategy(_currentReconnectDelay);
_timer = Timer(_currentReconnectDelay, _handleReconnect);
}
static List<Header> createCallHeaders(bool useTls, String authority,
String path, Duration timeout, Map<String, String> metadata,
{String userAgent}) {
final headers = [
_methodPost,
useTls ? _schemeHttps : _schemeHttp,
Header(ascii.encode(':path'), utf8.encode(path)),
Header(ascii.encode(':authority'), utf8.encode(authority)),
];
if (timeout != null) {
headers.add(Header.ascii('grpc-timeout', toTimeoutString(timeout)));
}
headers.addAll([
_contentTypeGrpc,
_teTrailers,
_grpcAcceptEncoding,
Header.ascii('user-agent', userAgent ?? defaultUserAgent),
]);
metadata?.forEach((key, value) {
headers.add(Header(ascii.encode(key), utf8.encode(value)));
});
return headers;
}
bool _validateBadCertificate(X509Certificate certificate) {
final credentials = this.credentials;
final validator = credentials.onBadCertificate;
if (validator == null) return false;
return validator(certificate, authority);
}
}

View File

@ -13,20 +13,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:io';
import 'dart:math';
import '../shared/security.dart';
import 'transport/http2_credentials.dart';
const defaultIdleTimeout = Duration(minutes: 5);
const defaultUserAgent = 'dart-grpc/1.0.3';
const defaultUserAgent = 'dart-grpc/2.0.0';
typedef BackoffStrategy = Duration Function(Duration lastBackoff);
typedef Duration BackoffStrategy(Duration lastBackoff);
// Backoff algorithm from https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md
const _minConnectTimeout = Duration(seconds: 20);
const _initialBackoff = Duration(seconds: 1);
const _maxBackoff = Duration(seconds: 120);
const _multiplier = 1.6;
@ -40,58 +35,6 @@ Duration defaultBackoffStrategy(Duration lastBackoff) {
return nextBackoff < _maxBackoff ? nextBackoff : _maxBackoff;
}
/// Handler for checking certificates that fail validation. If this handler
/// returns `true`, the bad certificate is allowed, and the TLS handshake can
/// continue. If the handler returns `false`, the TLS handshake fails, and the
/// connection is aborted.
typedef BadCertificateHandler = bool Function(
X509Certificate certificate, String host);
/// Bad certificate handler that disables all certificate checks.
/// DO NOT USE IN PRODUCTION!
/// Can be used during development and testing to accept self-signed
/// certificates, etc.
bool allowBadCertificates(X509Certificate certificate, String host) => true;
/// Options controlling TLS security settings on a [ClientChannel].
class ChannelCredentials {
final bool isSecure;
final List<int> _certificateBytes;
final String _certificatePassword;
final String authority;
final BadCertificateHandler onBadCertificate;
const ChannelCredentials._(this.isSecure, this._certificateBytes,
this._certificatePassword, this.authority, this.onBadCertificate);
/// Disable TLS. RPCs are sent in clear text.
const ChannelCredentials.insecure({String authority})
: this._(false, null, null, authority, null);
/// Enable TLS and optionally specify the [certificates] to trust. If
/// [certificates] is not provided, the default trust store is used.
const ChannelCredentials.secure(
{List<int> certificates,
String password,
String authority,
BadCertificateHandler onBadCertificate})
: this._(true, certificates, password, authority, onBadCertificate);
SecurityContext get securityContext {
if (!isSecure) return null;
if (_certificateBytes != null) {
return createSecurityContext(false)
..setTrustedCertificatesBytes(_certificateBytes,
password: _certificatePassword);
}
final context = SecurityContext(withTrustedRoots: true);
if (SecurityContext.alpnSupported) {
context.setAlpnProtocols(supportedAlpnProtocols, false);
}
return context;
}
}
/// Options controlling how connections are made on a [ClientChannel].
class ChannelOptions {
final ChannelCredentials credentials;
@ -109,52 +52,3 @@ class ChannelOptions {
this.userAgent = userAgent ?? defaultUserAgent,
this.backoffStrategy = backoffStrategy ?? defaultBackoffStrategy;
}
/// Provides per-RPC metadata.
///
/// Metadata providers will be invoked for every RPC, and can add their own
/// metadata to the RPC. If the function returns a [Future], the RPC will await
/// completion of the returned [Future] before transmitting the request.
///
/// The metadata provider is given the current [metadata] map (possibly modified
/// by previous metadata providers) and the [uri] that is being called, and is
/// expected to modify the map before returning or before completing the
/// returned [Future].
typedef MetadataProvider = FutureOr<void> Function(
Map<String, String> metadata, String uri);
/// Runtime options for an RPC.
class CallOptions {
final Map<String, String> metadata;
final Duration timeout;
final List<MetadataProvider> metadataProviders;
CallOptions._(this.metadata, this.timeout, this.metadataProviders);
/// Creates a [CallOptions] object.
///
/// [CallOptions] can specify static [metadata], set the [timeout], and
/// configure per-RPC metadata [providers]. The metadata [providers] are
/// invoked in order for every RPC, and can modify the outgoing metadata
/// (including metadata provided by previous providers).
factory CallOptions(
{Map<String, String> metadata,
Duration timeout,
List<MetadataProvider> providers}) {
return CallOptions._(Map.unmodifiable(metadata ?? {}), timeout,
List.unmodifiable(providers ?? []));
}
factory CallOptions.from(Iterable<CallOptions> options) =>
options.fold(CallOptions(), (p, o) => p.mergedWith(o));
CallOptions mergedWith(CallOptions other) {
if (other == null) return this;
final mergedMetadata = Map.from(metadata)..addAll(other.metadata);
final mergedTimeout = other.timeout ?? timeout;
final mergedProviders = List.from(metadataProviders)
..addAll(other.metadataProviders);
return CallOptions._(Map.unmodifiable(mergedMetadata), mergedTimeout,
List.unmodifiable(mergedProviders));
}
}

View File

@ -0,0 +1,68 @@
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:io';
import '../../shared/security.dart';
import '../options.dart' as options;
/// Handler for checking certificates that fail validation. If this handler
/// returns `true`, the bad certificate is allowed, and the TLS handshake can
/// continue. If the handler returns `false`, the TLS handshake fails, and the
/// connection is aborted.
typedef bool BadCertificateHandler(X509Certificate certificate, String host);
/// Bad certificate handler that disables all certificate checks.
/// DO NOT USE IN PRODUCTION!
/// Can be used during development and testing to accept self-signed
/// certificates, etc.
bool allowBadCertificates(X509Certificate certificate, String host) => true;
/// Options controlling TLS security settings on a [ClientChannel].
class ChannelCredentials {
final bool isSecure;
final String authority;
final List<int> _certificateBytes;
final String _certificatePassword;
final BadCertificateHandler onBadCertificate;
const ChannelCredentials._(this.isSecure, this._certificateBytes,
this._certificatePassword, this.authority, this.onBadCertificate);
/// Disable TLS. RPCs are sent in clear text.
const ChannelCredentials.insecure({String authority})
: this._(false, null, null, authority, null);
/// Enable TLS and optionally specify the [certificates] to trust. If
/// [certificates] is not provided, the default trust store is used.
const ChannelCredentials.secure(
{List<int> certificates,
String password,
String authority,
BadCertificateHandler onBadCertificate})
: this._(true, certificates, password, authority, onBadCertificate);
SecurityContext get securityContext {
if (!isSecure) return null;
if (_certificateBytes != null) {
return createSecurityContext(false)
..setTrustedCertificatesBytes(_certificateBytes,
password: _certificatePassword);
}
final context = new SecurityContext(withTrustedRoots: true);
context.setAlpnProtocols(supportedAlpnProtocols, false);
return context;
}
}

View File

@ -0,0 +1,52 @@
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'package:http2/transport.dart';
import '../../shared/message.dart';
import '../../shared/streams.dart';
import 'transport.dart';
class Http2TransportStream extends GrpcTransportStream {
final TransportStream _transportStream;
final Stream<GrpcMessage> incomingMessages;
final StreamController<List<int>> _outgoingMessages = StreamController();
final ErrorHandler _onError;
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
Http2TransportStream(this._transportStream, this._onError)
: incomingMessages = _transportStream.incomingMessages
.transform(GrpcHttpDecoder())
.transform(grpcDecompressor()) {
_outgoingMessages.stream
.map(frame)
.map<StreamMessage>((bytes) => DataStreamMessage(bytes))
.handleError(_onError)
.listen(_transportStream.outgoingMessages.add,
onError: _transportStream.outgoingMessages.addError,
onDone: _transportStream.outgoingMessages.close,
cancelOnError: true);
}
@override
Future<void> terminate() async {
await _outgoingMessages.close();
_transportStream.terminate();
}
}

View File

@ -0,0 +1,29 @@
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import '../../shared/message.dart';
typedef void SocketClosedHandler();
typedef void ActiveStateHandler(bool isActive);
typedef void ErrorHandler(error);
abstract class GrpcTransportStream {
Stream<GrpcMessage> get incomingMessages;
StreamSink<List<int>> get outgoingMessages;
Future<void> terminate();
}

View File

@ -0,0 +1,157 @@
// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
import '../../shared/message.dart';
import '../../shared/status.dart';
enum _GrpcWebParseState { Init, Length, Message }
class GrpcWebDecoder extends Converter<ByteBuffer, GrpcMessage> {
@override
GrpcMessage convert(ByteBuffer input) {
final sink = GrpcMessageSink();
startChunkedConversion(sink)
..add(input)
..close();
return sink.message;
}
@override
Sink<ByteBuffer> startChunkedConversion(Sink<GrpcMessage> sink) {
return _GrpcWebConversionSink(sink);
}
}
class _GrpcWebConversionSink extends ChunkedConversionSink<ByteBuffer> {
static const int frameTypeData = 0x00;
static const int frameTypeTrailers = 0x80;
final Sink<GrpcMessage> _out;
final _dataHeader = Uint8List(4);
_GrpcWebParseState _state = _GrpcWebParseState.Init;
int _chunkOffset;
int _frameType;
int _dataOffset = 0;
Uint8List _data;
_GrpcWebConversionSink(this._out);
int _parseFrameType(List<int> chunkData) {
final frameType = chunkData[_chunkOffset];
_chunkOffset++;
if (frameType != frameTypeData && frameType != frameTypeTrailers) {
throw GrpcError.unimplemented('Invalid frame type: ${frameType}');
}
_state = _GrpcWebParseState.Length;
return frameType;
}
void _parseLength(List<int> chunkData) {
final chunkLength = chunkData.length;
final headerRemaining = _dataHeader.lengthInBytes - _dataOffset;
final chunkRemaining = chunkLength - _chunkOffset;
final toCopy = min(headerRemaining, chunkRemaining);
_dataHeader.setRange(
_dataOffset, _dataOffset + toCopy, chunkData, _chunkOffset);
_dataOffset += toCopy;
_chunkOffset += toCopy;
if (_dataOffset == _dataHeader.lengthInBytes) {
final dataLength = _dataHeader.buffer.asByteData().getUint32(0);
_dataOffset = 0;
_state = _GrpcWebParseState.Message;
_data = Uint8List(dataLength);
if (dataLength == 0) {
// empty message
_finishMessage();
}
}
}
void _parseMessage(List<int> chunkData) {
final dataRemaining = _data.lengthInBytes - _dataOffset;
if (dataRemaining > 0) {
final chunkRemaining = chunkData.length - _chunkOffset;
final toCopy = min(dataRemaining, chunkRemaining);
_data.setRange(
_dataOffset, _dataOffset + toCopy, chunkData, _chunkOffset);
_dataOffset += toCopy;
_chunkOffset += toCopy;
}
if (_dataOffset == _data.lengthInBytes) {
_finishMessage();
}
}
void _finishMessage() {
switch (_frameType) {
case frameTypeData:
_out.add(GrpcData(_data, isCompressed: false));
break;
case frameTypeTrailers:
final stringData = String.fromCharCodes(_data);
final headers = _parseHttp1Headers(stringData);
_out.add(GrpcMetadata(headers));
break;
}
_state = _GrpcWebParseState.Init;
_data = null;
_dataOffset = 0;
}
Map<String, String> _parseHttp1Headers(String stringData) {
final chunks = stringData.trim().split('\r\n');
final headers = <String, String>{};
for (final chunk in chunks) {
final pos = chunk.indexOf(':');
headers[chunk.substring(0, pos).trim()] = chunk.substring(pos + 1).trim();
}
return headers;
}
@override
void add(ByteBuffer chunk) {
_chunkOffset = 0;
final chunkData = chunk.asUint8List();
while (_chunkOffset < chunk.lengthInBytes) {
switch (_state) {
case _GrpcWebParseState.Init:
_frameType = _parseFrameType(chunkData);
break;
case _GrpcWebParseState.Length:
_parseLength(chunkData);
break;
case _GrpcWebParseState.Message:
_parseMessage(chunkData);
break;
}
}
_chunkOffset = 0;
}
@override
void close() {
if (_data != null || _dataOffset != 0) {
throw GrpcError.unavailable('Closed in non-idle state');
}
_out.close();
}
}

View File

@ -0,0 +1,194 @@
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:html';
import 'dart:typed_data';
import 'package:grpc/src/client/call.dart';
import 'package:meta/meta.dart';
import '../../shared/message.dart';
import '../../shared/status.dart';
import '../connection.dart';
import 'transport.dart';
import 'web_streams.dart';
class XhrTransportStream implements GrpcTransportStream {
final HttpRequest _request;
final ErrorHandler _onError;
final Function(XhrTransportStream stream) _onDone;
int _requestBytesRead = 0;
final StreamController<ByteBuffer> _incomingProcessor = StreamController();
final StreamController<GrpcMessage> _incomingMessages = StreamController();
final StreamController<List<int>> _outgoingMessages = StreamController();
@override
Stream<GrpcMessage> get incomingMessages => _incomingMessages.stream;
@override
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
XhrTransportStream(this._request, {onError, onDone})
: _onError = onError,
_onDone = onDone {
_outgoingMessages.stream
.map(frame)
.listen((data) => _request.send(data), cancelOnError: true);
_request.onReadyStateChange.listen((data) {
if (_incomingMessages.isClosed) {
return;
}
switch (_request.readyState) {
case HttpRequest.HEADERS_RECEIVED:
_onHeadersReceived();
break;
case HttpRequest.DONE:
if (_request.status != 200) {
_onError(GrpcError.unavailable(
'XhrConnection status ${_request.status}'));
} else {
_close();
}
break;
}
});
_request.onError.listen((ProgressEvent event) {
if (_incomingMessages.isClosed) {
return;
}
_onError(GrpcError.unavailable('XhrConnection connection-error'));
terminate();
});
_request.onProgress.listen((_) {
if (_incomingMessages.isClosed) {
return;
}
// Use response over responseText as most browsers don't support
// using responseText during an onProgress event.
final responseString = _request.response as String;
final bytes = Uint8List.fromList(
responseString.substring(_requestBytesRead).codeUnits)
.buffer;
_requestBytesRead = responseString.length;
_incomingProcessor.add(bytes);
});
_incomingProcessor.stream
.transform(GrpcWebDecoder())
.transform(grpcDecompressor())
.listen(_incomingMessages.add,
onError: _onError, onDone: _incomingMessages.close);
}
_onHeadersReceived() {
final contentType = _request.getResponseHeader('Content-Type');
if (_request.status != 200) {
_onError(
GrpcError.unavailable('XhrConnection status ${_request.status}'));
return;
}
if (contentType == null) {
_onError(GrpcError.unavailable('XhrConnection missing Content-Type'));
return;
}
if (!contentType.startsWith('application/grpc')) {
_onError(
GrpcError.unavailable('XhrConnection bad Content-Type $contentType'));
return;
}
if (_request.response == null) {
_onError(GrpcError.unavailable('XhrConnection request null response'));
return;
}
// Force a metadata message with headers.
final headers = GrpcMetadata(_request.responseHeaders);
_incomingMessages.add(headers);
}
_close() {
_incomingProcessor.close();
_outgoingMessages.close();
_onDone(this);
}
@override
Future<void> terminate() async {
_close();
_request.abort();
}
}
class XhrClientConnection extends ClientConnection {
final Uri uri;
final Set<XhrTransportStream> _requests = Set<XhrTransportStream>();
XhrClientConnection(this.uri);
String get authority => uri.authority;
void _initializeRequest(HttpRequest request, Map<String, String> metadata) {
for (final header in metadata.keys) {
request.setRequestHeader(header, metadata[header]);
}
request.setRequestHeader('Content-Type', 'application/grpc-web+proto');
request.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1');
request.setRequestHeader('X-Grpc-Web', '1');
// Overriding the mimetype allows us to stream and parse the data
request.overrideMimeType('text/plain; charset=x-user-defined');
request.responseType = 'text';
}
@visibleForTesting
HttpRequest createHttpRequest() => HttpRequest();
@override
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onError) {
final HttpRequest request = createHttpRequest();
request.open('POST', uri.resolve(path).toString());
_initializeRequest(request, metadata);
final XhrTransportStream transportStream =
XhrTransportStream(request, onError: onError, onDone: _removeStream);
_requests.add(transportStream);
return transportStream;
}
void _removeStream(XhrTransportStream stream) {
_requests.remove(stream);
}
@override
Future<void> terminate() async {
for (XhrTransportStream request in _requests) {
request.terminate();
}
}
@override
void dispatchCall(ClientCall call) {
call.onConnectionReady(this);
}
@override
Future<void> shutdown() async {}
}

View File

@ -0,0 +1,33 @@
// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'channel.dart';
import 'connection.dart';
import 'options.dart';
import 'transport/xhr_transport.dart';
/// A channel to a grpc-web endpoint.
class GrpcWebClientChannel extends ClientChannelBase {
final Uri uri;
GrpcWebClientChannel.xhr(this.uri) : super();
@override
ClientConnection createConnection() {
return XhrClientConnection(uri);
}
}

View File

@ -18,6 +18,7 @@ import 'dart:convert';
import 'package:http2/transport.dart';
import '../shared/message.dart';
import '../shared/status.dart';
import '../shared/streams.dart';
import '../shared/timeout.dart';
@ -223,7 +224,7 @@ class ServerHandler_ extends ServiceCall {
if (!_headersSent) {
sendHeaders();
}
_stream.sendData(GrpcHttpEncoder.frame(bytes));
_stream.sendData(frame(bytes));
} catch (error) {
final grpcError = GrpcError.internal('Error sending response: $error');
if (!_requests.isClosed) {
@ -353,9 +354,6 @@ class ServerHandler_ extends ServiceCall {
}
}
@Deprecated(
'This is an internal class, and will not be part of the public interface in next major version.')
// TODO(sigurdm): Remove this class from grpc.dart exports.
class ServerHandler extends ServerHandler_ {
ServerHandler(Service Function(String service) serviceLookup, stream,
[List<Interceptor> interceptors = const <Interceptor>[]])

View File

@ -0,0 +1,78 @@
// Copyright (c) 2019, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:typed_data';
abstract class GrpcMessage {}
class GrpcMetadata extends GrpcMessage {
final Map<String, String> metadata;
GrpcMetadata(this.metadata);
@override
String toString() => 'gRPC Metadata ($metadata)';
}
class GrpcData extends GrpcMessage {
final List<int> data;
final bool isCompressed;
GrpcData(this.data, {this.isCompressed}) : assert(data != null);
@override
String toString() => 'gRPC Data (${data.length} bytes)';
}
class GrpcMessageSink extends Sink<GrpcMessage> {
GrpcMessage message;
@override
void add(GrpcMessage data) {
if (message != null) {
throw 'Too many messages received!';
}
message = data;
}
@override
void close() {
if (message == null) {
throw 'No messages received!';
}
}
}
List<int> frame(List<int> payload) {
final payloadLength = payload.length;
final bytes = Uint8List(payloadLength + 5);
final header = bytes.buffer.asByteData(0, 5);
header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression
header.setUint32(1, payloadLength);
bytes.setRange(5, bytes.length, payload);
return bytes;
}
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor() =>
StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
if (value is GrpcData) {
if (value.isCompressed) {
// TODO(dart-lang/grpc-dart#6): Actually handle decompression.
sink.add(GrpcData(value.data, isCompressed: false));
return;
}
}
sink.add(value);
});

View File

@ -13,47 +13,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'dart:typed_data';
import 'package:http2/transport.dart';
import 'message.dart';
import 'status.dart';
abstract class GrpcMessage {}
class GrpcMetadata extends GrpcMessage {
final Map<String, String> metadata;
GrpcMetadata(this.metadata);
@override
String toString() => 'gRPC Metadata ($metadata)';
}
class GrpcData extends GrpcMessage {
final List<int> data;
final bool isCompressed;
GrpcData(this.data, {this.isCompressed});
@override
String toString() => 'gRPC Data (${data.length} bytes)';
}
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor() =>
StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
if (value is GrpcData) {
if (value.isCompressed) {
// TODO(dart-lang/grpc-dart#6): Actually handle decompression.
sink.add(GrpcData(value.data, isCompressed: false));
return;
}
}
sink.add(value);
});
class GrpcHttpEncoder extends Converter<GrpcMessage, StreamMessage> {
@override
StreamMessage convert(GrpcMessage input) {
@ -68,22 +36,12 @@ class GrpcHttpEncoder extends Converter<GrpcMessage, StreamMessage> {
}
throw GrpcError.internal('Unexpected message type');
}
static List<int> frame(List<int> payload) {
final payloadLength = payload.length;
final bytes = Uint8List(payloadLength + 5);
final header = bytes.buffer.asByteData(0, 5);
header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression
header.setUint32(1, payloadLength);
bytes.setRange(5, bytes.length, payload);
return bytes;
}
}
class GrpcHttpDecoder extends Converter<StreamMessage, GrpcMessage> {
@override
GrpcMessage convert(StreamMessage input) {
final sink = _GrpcMessageSink();
final sink = GrpcMessageSink();
startChunkedConversion(sink)
..add(input)
..close();
@ -183,22 +141,3 @@ class _GrpcMessageConversionSink extends ChunkedConversionSink<StreamMessage> {
_out.close();
}
}
class _GrpcMessageSink extends Sink<GrpcMessage> {
GrpcMessage message;
@override
void add(GrpcMessage data) {
if (message != null) {
throw 'Too many messages received!';
}
message = data;
}
@override
void close() {
if (message == null) {
throw 'No messages received!';
}
}
}

View File

@ -1,6 +1,8 @@
name: grpc
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
version: 1.0.3
version: 2.0.0
author: Dart Team <misc@dartlang.org>
homepage: https://github.com/dart-lang/grpc-dart
@ -15,5 +17,8 @@ dependencies:
http2: ^1.0.0
dev_dependencies:
build_runner: ^1.5.2
build_test: ^0.10.8
build_web_compilers: ^2.1.1
mockito: ^4.1.0
test: ^1.6.4

View File

@ -19,8 +19,8 @@ import 'package:grpc/grpc.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
import 'src/client_utils.dart';
import 'src/utils.dart';
import '../src/client_utils.dart';
import '../src/utils.dart';
void main() {
const dummyValue = 0;

View File

@ -0,0 +1,205 @@
// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
@TestOn('browser')
import 'dart:async';
import 'dart:html';
import 'package:grpc/src/client/transport/xhr_transport.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:mockito/mockito.dart';
import 'package:test/test.dart';
class MockHttpRequest extends Mock implements HttpRequest {
// ignore: close_sinks
StreamController<Event> readyStateChangeController =
StreamController<Event>();
// ignore: close_sinks
StreamController<ProgressEvent> progressController =
StreamController<ProgressEvent>();
@override
Stream<Event> get onReadyStateChange => readyStateChangeController.stream;
@override
Stream<ProgressEvent> get onProgress => progressController.stream;
@override
Stream<ProgressEvent> get onError => StreamController<ProgressEvent>().stream;
@override
int status = 200;
}
class MockXhrClientConnection extends XhrClientConnection {
MockXhrClientConnection() : super(Uri.parse('test:8080'));
MockHttpRequest latestRequest;
@override
createHttpRequest() {
final request = MockHttpRequest();
latestRequest = request;
return request;
}
}
void main() {
test('Make request sends correct headers', () async {
final metadata = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2'
};
final connection = MockXhrClientConnection();
connection.makeRequest('path', Duration(seconds: 10), metadata,
(error) => fail(error.toString()));
verify(connection.latestRequest
.setRequestHeader('Content-Type', 'application/grpc-web+proto'));
verify(connection.latestRequest
.setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1'));
verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1'));
verify(connection.latestRequest
.overrideMimeType('text/plain; charset=x-user-defined'));
verify(connection.latestRequest.responseType = 'text');
});
test('Sent data converted to stream properly', () async {
final metadata = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2'
};
final connection = MockXhrClientConnection();
final stream = connection.makeRequest('path', Duration(seconds: 10),
metadata, (error) => fail(error.toString()));
final data = List.filled(10, 0);
stream.outgoingMessages.add(data);
await stream.terminate();
final expectedData = frame(data);
expect(verify(connection.latestRequest.send(captureAny)).captured.single,
expectedData);
});
test('Stream handles headers properly', () async {
final metadata = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2'
};
final transport = MockXhrClientConnection();
final stream = transport.makeRequest('test_path', Duration(seconds: 10),
metadata, (error) => fail(error.toString()));
stream.incomingMessages.listen((message) {
expect(message, TypeMatcher<GrpcMetadata>());
if (message is GrpcMetadata) {
message.metadata.forEach((key, value) {
expect(value, metadata[key]);
});
}
});
});
test('Stream deserializes data properly', () async {
final metadata = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2'
};
final connection = MockXhrClientConnection();
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
metadata, (error) => fail(error.toString()));
final data = List<int>.filled(10, 224);
final encoded = frame(data);
final encodedString = String.fromCharCodes(encoded);
stream.incomingMessages.listen(expectAsync1((message) {
if (message is GrpcData) {
expect(message.data, equals(data));
}
}, count: 2));
when(connection.latestRequest.getResponseHeader('Content-Type'))
.thenReturn('application/grpc+proto');
when(connection.latestRequest.responseHeaders).thenReturn(metadata);
when(connection.latestRequest.readyState)
.thenReturn(HttpRequest.HEADERS_RECEIVED);
when(connection.latestRequest.response).thenReturn(encodedString);
connection.latestRequest.readyStateChangeController.add(null);
connection.latestRequest.progressController.add(null);
});
test('Stream recieves multiple messages', () async {
final metadata = <String, String>{
'parameter_1': 'value_1',
'parameter_2': 'value_2'
};
final connection = MockXhrClientConnection();
final stream = connection.makeRequest('test_path', Duration(seconds: 10),
metadata, (error) => fail(error.toString()));
final data = <List<int>>[
List<int>.filled(10, 224),
List<int>.filled(5, 124)
];
final encoded = data.map((d) => frame(d));
final encodedStrings = encoded.map((e) => String.fromCharCodes(e)).toList();
final expectedMessages = <GrpcMessage>[
GrpcMetadata(metadata),
GrpcData(data[0]),
GrpcData(data[1])
];
int i = 0;
stream.incomingMessages.listen(expectAsync1((message) {
final expectedMessage = expectedMessages[i];
i++;
expect(message.runtimeType, expectedMessage.runtimeType);
if (message is GrpcMetadata) {
expect(message.metadata, (expectedMessage as GrpcMetadata).metadata);
} else if (message is GrpcData) {
expect(message.data, (expectedMessage as GrpcData).data);
}
}, count: expectedMessages.length));
when(connection.latestRequest.getResponseHeader('Content-Type'))
.thenReturn('application/grpc+proto');
when(connection.latestRequest.responseHeaders).thenReturn(metadata);
when(connection.latestRequest.readyState)
.thenReturn(HttpRequest.HEADERS_RECEIVED);
// At first - expected response is the first message
when(connection.latestRequest.response)
.thenAnswer((_) => encodedStrings[0]);
connection.latestRequest.readyStateChangeController.add(null);
connection.latestRequest.progressController.add(null);
// After the first call, expected response should now be both responses together
when(connection.latestRequest.response)
.thenAnswer((_) => encodedStrings[0] + encodedStrings[1]);
connection.latestRequest.progressController.add(null);
});
}

View File

@ -0,0 +1,17 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:grpc/src/client/transport/web_streams.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:test/test.dart';
main() {
test("decoding an empty repeated", () async {
final GrpcData data = await GrpcWebDecoder()
.bind(Stream.fromIterable([
Uint8List.fromList([0, 0, 0, 0, 0]).buffer
]))
.first as GrpcData;
expect(data.data, []);
});
}

View File

@ -12,10 +12,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
@TestOn('vm')
import 'dart:io';
import 'package:grpc/grpc.dart';
import 'package:grpc/src/client/transport/http2_credentials.dart';
import 'package:test/test.dart';
const isTlsException = TypeMatcher<TlsException>();

View File

@ -1,3 +1,4 @@
@TestOn('vm')
import 'dart:async';
import 'dart:isolate';
import 'package:grpc/grpc.dart' as grpc;

View File

@ -12,6 +12,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
@TestOn('vm')
import 'dart:async';

View File

@ -14,7 +14,10 @@
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'package:grpc/src/client/http2_connection.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:grpc/src/shared/streams.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
@ -28,7 +31,7 @@ class MockTransport extends Mock implements ClientTransportConnection {}
class MockStream extends Mock implements ClientTransportStream {}
class FakeConnection extends ClientConnection {
class FakeConnection extends Http2ClientConnection {
final ClientTransportConnection transport;
var connectionError;
@ -53,14 +56,14 @@ class FakeChannelOptions implements ChannelOptions {
}
class FakeChannel extends ClientChannel {
final ClientConnection connection;
final Http2ClientConnection connection;
final FakeChannelOptions options;
FakeChannel(String host, this.connection, this.options)
: super(host, options: options);
@override
Future<ClientConnection> getConnection() async => connection;
Future<Http2ClientConnection> getConnection() async => connection;
}
typedef ServerMessageHandler = void Function(StreamMessage message);
@ -141,7 +144,7 @@ class ClientHarness {
}
void sendResponseValue(int value) {
toClient.add(DataStreamMessage(GrpcHttpEncoder.frame(mockEncode(value))));
toClient.add(DataStreamMessage(frame(mockEncode(value))));
}
void sendResponseTrailer(
@ -183,7 +186,9 @@ class ClientHarness {
final List<Header> capturedHeaders =
verify(transport.makeRequest(captureAny)).captured.single;
validateRequestHeaders(capturedHeaders,
validateRequestHeaders(
Map.fromEntries(capturedHeaders.map((header) =>
MapEntry(utf8.decode(header.name), utf8.decode(header.value)))),
path: expectedPath,
timeout: toTimeoutString(expectedTimeout),
customHeaders: expectedCustomHeaders);

View File

@ -15,11 +15,14 @@
import 'dart:async';
import 'package:grpc/src/client/http2_connection.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:grpc/src/shared/streams.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
import 'package:grpc/grpc.dart';
import 'package:grpc/src/client/transport/http2_transport.dart';
import 'utils.dart';
@ -170,14 +173,14 @@ class ServerHarness {
{String authority = 'test',
Map<String, String> metadata,
Duration timeout}) {
final headers = ClientConnection.createCallHeaders(
final headers = Http2ClientConnection.createCallHeaders(
true, authority, path, timeout, metadata,
userAgent: 'dart-grpc/1.0.0 test');
toServer.add(HeadersStreamMessage(headers));
}
void sendData(int value) {
toServer.add(DataStreamMessage(GrpcHttpEncoder.frame(mockEncode(value))));
toServer.add(DataStreamMessage(frame(mockEncode(value))));
}
void runTest(String path, List<int> requests, List<int> expectedResponses) {

View File

@ -16,6 +16,7 @@
import 'dart:convert';
import 'package:grpc/src/shared/streams.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
@ -29,26 +30,25 @@ Map<String, String> headersToMap(List<Header> headers) =>
Map.fromIterable(headers,
key: (h) => ascii.decode(h.name), value: (h) => ascii.decode(h.value));
void validateRequestHeaders(List<Header> headers,
void validateRequestHeaders(Map<String, String> headers,
{String path,
String authority = 'test',
String timeout,
Map<String, String> customHeaders}) {
final headerMap = headersToMap(headers);
expect(headerMap[':method'], 'POST');
expect(headerMap[':scheme'], 'https');
expect(headers[':method'], 'POST');
expect(headers[':scheme'], 'https');
if (path != null) {
expect(headerMap[':path'], path);
expect(headers[':path'], path);
}
expect(headerMap[':authority'], authority);
expect(headerMap['grpc-timeout'], timeout);
expect(headerMap['content-type'], 'application/grpc');
expect(headerMap['te'], 'trailers');
expect(headerMap['grpc-accept-encoding'], 'identity');
expect(headerMap['user-agent'], startsWith('dart-grpc/'));
expect(headers[':authority'], authority);
expect(headers['grpc-timeout'], timeout);
expect(headers['content-type'], 'application/grpc');
expect(headers['te'], 'trailers');
expect(headers['grpc-accept-encoding'], 'identity');
expect(headers['user-agent'], startsWith('dart-grpc/'));
customHeaders?.forEach((key, value) {
expect(headerMap[key], value);
expect(headers[key], value);
});
}

View File

@ -136,5 +136,5 @@ void main() {
..sendRequestHeader('/Test/Unary', timeout: Duration(microseconds: 1));
await harness.fromServer.done;
});
});
}, testOn: 'vm');
}