Add TLS support. (#29)

Added ChannelOptions, which are used to specify options on a
ClientChannel. At the moment, only TLS options are supported.

Moved CallOptions from ClientChannel to a new Client stub base class.
Per-RPC call options are now specified on the stub instead of on the
channel, allowing several stubs with different options to share the same
channel.

Added support for TLS on the server side. TLS options are specified when
creating the Server.
This commit is contained in:
Jakob Andersen 2017-08-17 09:45:11 +02:00 committed by GitHub
parent 5c4e50c696
commit 83ee9c2edb
14 changed files with 246 additions and 56 deletions

View File

@ -13,7 +13,8 @@ class Client {
MetadataClient stub;
Future<Null> main(List<String> args) async {
channel = new ClientChannel('127.0.0.1', port: 8080);
channel = new ClientChannel('127.0.0.1',
port: 8080, options: const ChannelOptions.insecure());
stub = new MetadataClient(channel);
// Run all of the demos in order.
await runEcho();
@ -41,11 +42,10 @@ class Client {
// Run the echo with delay cancel demo. ...
Future<Null> runEchoDelayCancel() async {
final channelWithOptions = new ClientChannel('127.0.0.1',
port: 8080, options: new CallOptions(metadata: {'peer': 'Verner'}));
final stubWithCustomChannel = new MetadataClient(channelWithOptions);
final stubWithCustomOptions = new MetadataClient(channel,
options: new CallOptions(metadata: {'peer': 'Verner'}));
final request = new Record()..value = 'Kaj';
final call = stubWithCustomChannel.echo(request,
final call = stubWithCustomOptions.echo(request,
options: new CallOptions(metadata: {'delay': '1'}));
call.headers.then((headers) {
print('Received header metadata: $headers');
@ -61,7 +61,6 @@ class Client {
} catch (error) {
print('Expected error: $error');
}
await channelWithOptions.close();
}
// Run the addOne cancel demo.

View File

@ -12,9 +12,7 @@ import 'package:grpc/grpc.dart';
import 'metadata.pb.dart';
export 'metadata.pb.dart';
class MetadataClient {
final ClientChannel _channel;
class MetadataClient extends Client {
static final _$echo = new ClientMethod<Record, Record>(
'/grpc.Metadata/Echo',
(Record value) => value.writeToBuffer(),
@ -28,10 +26,11 @@ class MetadataClient {
(Empty value) => value.writeToBuffer(),
(List<int> value) => new Number.fromBuffer(value));
MetadataClient(this._channel);
MetadataClient(ClientChannel channel, {CallOptions options})
: super(channel, options: options);
ResponseFuture<Record> echo(Record request, {CallOptions options}) {
final call = new ClientCall(_channel, _$echo, options: options);
final call = $createCall(_$echo, options: options);
call.request
..add(request)
..close();
@ -39,13 +38,13 @@ class MetadataClient {
}
ResponseStream<Number> addOne(Stream<Number> request, {CallOptions options}) {
final call = new ClientCall(_channel, _$addOne, options: options);
final call = $createCall(_$addOne, options: options);
request.pipe(call.request);
return new ResponseStream(call);
}
ResponseStream<Number> fibonacci(Empty request, {CallOptions options}) {
final call = new ClientCall(_channel, _$fibonacci, options: options);
final call = $createCall(_$fibonacci, options: options);
call.request
..add(request)
..close();

View File

@ -66,8 +66,8 @@ class MetadataService extends MetadataServiceBase {
class Server {
Future<Null> main(List<String> args) async {
final server = new grpc.Server(port: 8080)
..addService(new MetadataService());
final server =
new grpc.Server.insecure([new MetadataService()], port: 8080);
await server.serve();
print('Server listening...');
}

View File

@ -16,7 +16,8 @@ class Client {
RouteGuideClient stub;
Future<Null> main(List<String> args) async {
channel = new ClientChannel('127.0.0.1', port: 8080);
channel = new ClientChannel('127.0.0.1',
port: 8080, options: const ChannelOptions.insecure());
stub = new RouteGuideClient(channel);
// Run all of the demos in order.
await runGetFeature();

View File

@ -128,8 +128,8 @@ class RouteGuideService extends RouteGuideServiceBase {
class Server {
Future<Null> main(List<String> args) async {
final server = new grpc.Server(port: 8080)
..addService(new RouteGuideService());
final server =
new grpc.Server.insecure([new RouteGuideService()], port: 8080);
await server.serve();
print('Server listening...');
}

View File

@ -4,6 +4,7 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:args/args.dart';
import 'package:grpc/grpc.dart';
@ -105,9 +106,24 @@ Future<Null> main(List<String> args) async {
final argumentParser = new ArgParser();
argumentParser.addOption('port', defaultsTo: '8080');
argumentParser.addOption('use_tls', defaultsTo: 'false');
argumentParser.addOption('tls_cert_file', defaultsTo: 'server1.pem');
argumentParser.addOption('tls_key_file', defaultsTo: 'server1.key');
final arguments = argumentParser.parse(args);
final port = int.parse(arguments['port']);
final server = new Server(port: port)..addService(new TestService());
final services = [new TestService()];
Server server;
if (arguments['use_tls'] == 'true') {
final certificate = new File(arguments['tls_cert_file']).readAsBytes();
final privateKey = new File(arguments['tls_key_file']).readAsBytes();
server = new Server.secure(services,
certificate: await certificate,
privateKey: await privateKey,
port: port);
} else {
server = new Server.insecure(services, port: port);
}
await server.serve();
print('Server listening...');
print('Server listening on port $port...');
}

16
interop/server1.key Normal file
View File

@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD
M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf
3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY
AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm
V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY
tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p
dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q
K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR
81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff
DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd
aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2
ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3
XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe
F98XJ7tIFfJq
-----END PRIVATE KEY-----

16
interop/server1.pem Normal file
View File

@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICnDCCAgWgAwIBAgIBBzANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET
MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ
dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTA0MDIyMDI0WhcNMjUxMTAx
MDIyMDI0WjBlMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV
BAcTB0NoaWNhZ28xFTATBgNVBAoTDEV4YW1wbGUsIENvLjEaMBgGA1UEAxQRKi50
ZXN0Lmdvb2dsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOHDFSco
LCVJpYDDM4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1Bg
zkWF+slf3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd
9N8YwbBYAckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAGjazBpMAkGA1UdEwQCMAAw
CwYDVR0PBAQDAgXgME8GA1UdEQRIMEaCECoudGVzdC5nb29nbGUuZnKCGHdhdGVy
em9vaS50ZXN0Lmdvb2dsZS5iZYISKi50ZXN0LnlvdXR1YmUuY29thwTAqAEDMA0G
CSqGSIb3DQEBCwUAA4GBAJFXVifQNub1LUP4JlnX5lXNlo8FxZ2a12AFQs+bzoJ6
hM044EDjqyxUqSbVePK0ni3w1fHQB5rY9yYC5f8G7aqqTY1QOhoUk8ZTSTRpnkTh
y4jjdvTZeLDVBlueZUTDRmy2feY5aZIU18vFDK08dTG0A87pppuv1LNIR3loveU8
-----END CERTIFICATE-----

View File

@ -21,23 +21,33 @@ const _reservedHeaders = const [
'user-agent',
];
/// Runtime options for a RPC call.
class CallOptions {
final Map<String, String> metadata;
final Duration timeout;
/// Options controlling how connections are made on a [ClientChannel].
class ChannelOptions {
final bool _useTls;
final List<int> _certificateBytes;
final String _certificatePassword;
CallOptions._(this.metadata, this.timeout);
const ChannelOptions._(this._useTls,
[this._certificateBytes, this._certificatePassword]);
factory CallOptions({Map<String, String> metadata, Duration timeout}) {
final sanitizedMetadata = <String, String>{};
metadata?.forEach((key, value) {
final lowerCaseKey = key.toLowerCase();
if (!lowerCaseKey.startsWith(':') &&
!_reservedHeaders.contains(lowerCaseKey)) {
sanitizedMetadata[lowerCaseKey] = value;
}
});
return new CallOptions._(new Map.unmodifiable(sanitizedMetadata), timeout);
/// Enable TLS using the default trust store.
const ChannelOptions() : this._(true);
/// Disable TLS. RPCs are sent in clear text.
const ChannelOptions.insecure() : this._(false);
/// Enable TLS and specify the [certificate]s to trust.
ChannelOptions.secure({List<int> certificate, String password})
: this._(true, certificate, password);
SecurityContext get securityContext {
if (!_useTls) return null;
final context = createSecurityContext(false);
if (_certificateBytes != null) {
context.setTrustedCertificatesBytes(_certificateBytes,
password: _certificatePassword);
}
return context;
}
}
@ -45,17 +55,26 @@ class CallOptions {
class ClientChannel {
final String host;
final int port;
final CallOptions options;
final ChannelOptions options;
final List<Socket> _sockets = [];
final List<TransportConnection> _connections = [];
ClientChannel(this.host, {this.port = 8080, this.options});
ClientChannel(this.host,
{this.port = 443, this.options = const ChannelOptions()});
/// Returns a connection to this [Channel]'s RPC endpoint. The connection may
/// be shared between multiple RPC calls.
Future<ClientTransportConnection> connect() async {
final socket = await Socket.connect(host, port);
final securityContext = options.securityContext;
Socket socket;
if (securityContext == null) {
socket = await Socket.connect(host, port);
} else {
socket = await SecureSocket.connect(host, port,
context: securityContext, supportedProtocols: ['grpc-exp', 'h2']);
}
_sockets.add(socket);
final connection = new ClientTransportConnection.viaSocket(socket);
_connections.add(connection);
@ -80,6 +99,49 @@ class ClientMethod<Q, R> {
ClientMethod(this.path, this.requestSerializer, this.responseDeserializer);
}
/// Runtime options for an RPC.
class CallOptions {
final Map<String, String> metadata;
final Duration timeout;
CallOptions._(this.metadata, this.timeout);
factory CallOptions({Map<String, String> metadata, Duration timeout}) {
final sanitizedMetadata = <String, String>{};
metadata?.forEach((key, value) {
final lowerCaseKey = key.toLowerCase();
if (!lowerCaseKey.startsWith(':') &&
!_reservedHeaders.contains(lowerCaseKey)) {
sanitizedMetadata[lowerCaseKey] = value;
}
});
return new CallOptions._(new Map.unmodifiable(sanitizedMetadata), timeout);
}
CallOptions mergedWith(CallOptions other) {
if (other == null) return this;
final mergedMetadata = new Map.from(metadata)..addAll(other.metadata);
final mergedTimeout = other.timeout ?? timeout;
return new CallOptions._(
new Map.unmodifiable(mergedMetadata), mergedTimeout);
}
}
/// Base class for client stubs.
class Client {
final ClientChannel _channel;
final CallOptions _options;
Client(this._channel, {CallOptions options})
: _options = options ?? new CallOptions();
ClientCall<Q, R> $createCall<Q, R>(ClientMethod<Q, R> method,
{CallOptions options}) {
return new ClientCall(_channel, method,
options: _options.mergedWith(options));
}
}
/// An active call to a gRPC endpoint.
class ClientCall<Q, R> implements Response {
static final _methodPost = new Header.ascii(':method', 'POST');
@ -112,7 +174,7 @@ class ClientCall<Q, R> implements Response {
ClientCall(this._channel, this._method, {this.options}) {
_responses = new StreamController(onListen: _onResponseListen);
final timeout = options?.timeout ?? _channel.options?.timeout;
final timeout = options?.timeout;
if (timeout != null) {
_timeoutTimer = new Timer(timeout, _onTimedOut);
}
@ -157,11 +219,8 @@ class ClientCall<Q, R> implements Response {
final timeoutString = toTimeoutString(timeout);
// TODO(jakobr): Flip this around, and have the Channel create the call
// object and apply options (including the above TODO).
final customMetadata = <String, String>{};
customMetadata.addAll(_channel.options?.metadata ?? {});
customMetadata.addAll(options?.metadata ?? {});
final headers = createCallHeaders(_method.path, _channel.host,
timeout: timeoutString, metadata: customMetadata);
timeout: timeoutString, metadata: options?.metadata);
_stream = connection.makeRequest(headers);
_requests.stream
.map(_method.requestSerializer)

View File

@ -53,28 +53,63 @@ abstract class Service {
/// A gRPC server.
///
/// Listens for incoming gRPC calls, dispatching them to a [ServerHandler].
/// Listens for incoming RPCs, dispatching them to the right [Service] handler.
class Server {
final Map<String, Service> _services = {};
ServerSocket _server;
final int port;
final SecurityContext _securityContext;
ServerSocket _insecureServer;
SecureServerSocket _secureServer;
final _connections = <ServerTransportConnection>[];
final int _port;
Server._(List<Service> services, this.port, this._securityContext) {
for (final service in services) {
_services[service.$name] = service;
}
}
Server({int port = 8080}) : _port = port;
/// Create a server for the given [services] with no transport security,
/// listening on [port].
factory Server.insecure(List<Service> services, {int port}) {
return new Server._(services, port ?? 80, null);
}
void addService(Service service) {
_services[service.$name] = service;
/// Create a secure server for the given [services], listening on [port].
///
/// If the [certificate] or [privateKey] is encrypted, the password must also
/// be provided.
factory Server.secure(List<Service> services,
{List<int> certificate,
String certificatePassword,
List<int> privateKey,
String privateKeyPassword,
int port}) {
final context = createSecurityContext(true);
if (privateKey != null) {
context.usePrivateKeyBytes(privateKey, password: privateKeyPassword);
}
if (certificate != null) {
context.useCertificateChainBytes(certificate,
password: certificatePassword);
}
return new Server._(services, port ?? 443, context);
}
Service lookupService(String service) => _services[service];
Future<Null> serve() async {
// TODO(dart-lang/grpc-dart#4): Add TLS support.
// TODO(dart-lang/grpc-dart#9): Handle HTTP/1.1 upgrade to h2c, if allowed.
_server = await ServerSocket.bind('0.0.0.0', _port);
_server.listen((socket) {
Stream<Socket> server;
if (_securityContext != null) {
_secureServer =
await SecureServerSocket.bind('0.0.0.0', port, _securityContext);
server = _secureServer;
} else {
_insecureServer = await ServerSocket.bind('0.0.0.0', port);
server = _insecureServer;
}
server.listen((socket) {
final connection = new ServerTransportConnection.viaSocket(socket);
_connections.add(connection);
connection.incomingStreams.listen(serveStream, onError: (error) {
@ -93,8 +128,11 @@ class Server {
Future<Null> shutdown() {
final done = _connections.map((connection) => connection.finish()).toList();
if (_server != null) {
done.add(_server.close());
if (_insecureServer != null) {
done.add(_insecureServer.close());
}
if (_secureServer != null) {
done.add(_secureServer.close());
}
return Future.wait(done);
}

View File

@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.
import 'dart:async';
import 'dart:io';
import 'package:async/async.dart';
import 'package:grpc/src/client.dart';
@ -122,3 +123,11 @@ abstract class _ResponseMixin<Q, R> implements Response {
@override
Future<Null> cancel() => _call.cancel();
}
// TODO: Simplify once we have a stable Dart 1.25 release (update pubspec to
// require SDK >=1.25.0, and remove check for alpnSupported).
SecurityContext createSecurityContext(bool isServer) =>
SecurityContext.alpnSupported
? (new SecurityContext()
..setAlpnProtocols(['grpc-exp', 'h2'], isServer))
: new SecurityContext();

BIN
test/data/certstore.p12 Normal file

Binary file not shown.

34
test/options_test.dart Normal file
View File

@ -0,0 +1,34 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.
import 'dart:io';
import 'package:grpc/grpc.dart';
import 'package:test/test.dart';
const isTlsException = const isInstanceOf<TlsException>();
void main() {
group('Certificates', () {
test('report password errors correctly', () async {
final certificate =
await new File('test/data/certstore.p12').readAsBytes();
final missingPassword =
new ChannelOptions.secure(certificate: certificate);
expect(() => missingPassword.securityContext, throwsA(isTlsException));
final wrongPassword = new ChannelOptions.secure(
certificate: certificate, password: 'wrong');
expect(() => wrongPassword.securityContext, throwsA(isTlsException));
final correctPassword = new ChannelOptions.secure(
certificate: certificate, password: 'correct');
expect(correctPassword.securityContext, isNotNull);
final channel = new ClientChannel('localhost', options: missingPassword);
expect(channel.connect(), throwsA(isTlsException));
});
});
}

View File

@ -96,7 +96,11 @@ class ServerHarness {
final toServer = new StreamController<StreamMessage>();
final fromServer = new StreamController<StreamMessage>();
final service = new TestService();
final server = new Server();
Server server;
ServerHarness() {
server = new Server.insecure([service]);
}
static ServiceMethod<int, int> createMethod(String name,
Function methodHandler, bool clientStreaming, bool serverStreaming) {
@ -105,7 +109,6 @@ class ServerHarness {
}
void setUp() {
server.addService(service);
final stream = new TestServerStream(toServer.stream, fromServer.sink);
server.serveStream(stream);
}