Add support for unix domain sockets (#327)

Fixes #299
This commit is contained in:
Lei Liu 2020-11-11 22:43:05 +08:00 committed by GitHub
parent 5b3a125914
commit 52bea07000
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 196 additions and 30 deletions

View File

@ -7,6 +7,8 @@
newer of protobuf compiler plugin.
* `Client.$createCall` is deprecated because it does not invoke client
interceptors.
* Change minimum required Dart SDK to 2.8 to enable access to Unix domain sockets.
* Add support for Unix domain sockets in `Socket.serve` and `ClientChannel`.
## 2.7.0
@ -19,7 +21,7 @@
## 2.6.0
* Create gRPC servers and clients with [Server|Client]TransportConnnection.
This allows callers to propvide their own transport configuration, such
This allows callers to provide their own transport configuration, such
as their own implementation of streams and sinks instead of sockets.
## 2.5.0

View File

@ -19,6 +19,7 @@ For complete documentation, see [Dart gRPC](https://grpc.io/docs/languages/dart)
- [Flutter](https://flutter.dev)
> **Note:** [grpc-web](https://github.com/grpc/grpc-web) is supported by `package:grpc/grpc_web.dart`.
> **UDS-unix domain socket** is supported with sdk version >= 2.8.0.
## Contributing

View File

@ -11,8 +11,9 @@ folder, i.e., .../example/helloworld/, first get the dependencies by running:
```sh
$ pub get
```
## Run TCP sample code
Then, to run the server:
Start the server:
```sh
$ dart bin/server.dart
@ -23,6 +24,21 @@ Likewise, to run the client:
```sh
$ dart bin/client.dart
```
## Run UDS sample code
Start the server:
```sh
$ dart bin/unix_server.dart
```
Likewise, to run the client:
```sh
$ dart bin/unix_client.dart
```
>**Note** the `UDS` only support *nix plantform.
# Regenerate the stubs

View File

@ -0,0 +1,42 @@
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Dart implementation of the gRPC helloworld.Greeter client.
import 'dart:io';
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pb.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
Future<void> main(List<String> args) async {
final udsAddress =
InternetAddress('localhost', type: InternetAddressType.unix);
final channel = ClientChannel(
udsAddress,
port: 0,
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
);
final stub = GreeterClient(channel);
final name = args.isNotEmpty ? args[0] : 'world';
try {
final response = await stub.sayHello(HelloRequest()..name = name);
print('Greeter client received: ${response.message}');
} catch (e) {
print('Caught error: $e');
}
await channel.shutdown();
}

View File

@ -0,0 +1,36 @@
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Dart implementation of the gRPC helloworld.Greeter server.
import 'dart:io';
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pb.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
class GreeterService extends GreeterServiceBase {
@override
Future<HelloReply> sayHello(ServiceCall call, HelloRequest request) async {
return HelloReply()..message = 'Hello, ${request.name}!';
}
}
Future<void> main(List<String> args) async {
final udsAddress =
InternetAddress('localhost', type: InternetAddressType.unix);
final server = Server([GreeterService()]);
await server.serve(address: udsAddress);
print('Start UNIX Server @localhost...');
}

View File

@ -3,7 +3,7 @@ description: Dart gRPC sample client and server.
homepage: https://github.com/dart-lang/grpc-dart
environment:
sdk: '>=2.2.0 <3.0.0'
sdk: '>=2.8.0 <3.0.0'
dependencies:
async: ^2.2.0

View File

@ -3,7 +3,7 @@ description: Dart gRPC interoperability test suite.
homepage: https://github.com/dart-lang/grpc-dart
environment:
sdk: '>=2.2.0 <3.0.0'
sdk: '>=2.8.0 <3.0.0'
dependencies:
args: ^1.5.2

View File

@ -25,7 +25,10 @@ import 'options.dart';
/// RPCs on the same channel may be sent to different connections, depending on
/// load balancing settings.
class ClientChannel extends ClientChannelBase {
final String host;
/// Starts the [Server] with the given options.
/// [address] can be either a [String] or an [InternetAddress], in the latter
/// case it can be a Unix Domain Socket address.
final Object host;
final int port;
final ChannelOptions options;

View File

@ -61,7 +61,7 @@ class Http2ClientConnection implements connection.ClientConnection {
Duration _currentReconnectDelay;
Http2ClientConnection(String host, int port, this.options)
Http2ClientConnection(Object host, int port, this.options)
: _transportConnector = _SocketTransportConnector(host, port, options);
Http2ClientConnection.fromClientTransportConnector(
@ -298,7 +298,7 @@ class Http2ClientConnection implements connection.ClientConnection {
}
class _SocketTransportConnector implements ClientTransportConnector {
final String _host;
final Object _host;
final int _port;
final ChannelOptions _options;
Socket _socket;
@ -310,7 +310,9 @@ class _SocketTransportConnector implements ClientTransportConnector {
final securityContext = _options.credentials.securityContext;
_socket = await Socket.connect(_host, _port);
// Don't wait for io buffers to fill up before sending requests.
_socket.setOption(SocketOption.tcpNoDelay, true);
if (_socket.address.type != InternetAddressType.unix) {
_socket.setOption(SocketOption.tcpNoDelay, true);
}
if (securityContext != null) {
// Todo(sigurdm): We want to pass supportedProtocols: ['h2'].
// http://dartbug.com/37950

View File

@ -145,6 +145,11 @@ class Server extends ConnectionServer {
return null;
}
Service lookupService(String service) => _services[service];
/// Starts the [Server] with the given options.
/// [address] can be either a [String] or an [InternetAddress], in the latter
/// case it can be a Unix Domain Socket address.
Future<void> serve(
{dynamic address,
int port,
@ -173,7 +178,9 @@ class Server extends ConnectionServer {
}
server.listen((socket) {
// Don't wait for io buffers to fill up before sending requests.
socket.setOption(SocketOption.tcpNoDelay, true);
if (socket.address.type != InternetAddressType.unix) {
socket.setOption(SocketOption.tcpNoDelay, true);
}
final connection = ServerTransportConnection.viaSocket(socket,
settings: http2ServerSettings);
serveConnection(connection);

View File

@ -6,7 +6,7 @@ version: 2.8.0-dev
homepage: https://github.com/dart-lang/grpc-dart
environment:
sdk: '>=2.3.0 <3.0.0'
sdk: '>=2.8.0 <3.0.0'
dependencies:
async: ^2.2.0

View File

@ -1,5 +1,6 @@
@TestOn('vm')
import 'dart:async';
import 'dart:io';
import 'package:grpc/grpc.dart' as grpc;
import 'package:grpc/service_api.dart';
import 'package:grpc/src/client/channel.dart';
@ -7,6 +8,7 @@ import 'package:grpc/src/client/connection.dart';
import 'package:grpc/src/client/http2_connection.dart';
import 'package:http2/http2.dart';
import 'package:test/test.dart';
import 'common.dart';
class TestClient extends grpc.Client {
static final _$stream = grpc.ClientMethod<int, int>(
@ -47,12 +49,14 @@ class FixedConnectionClientChannel extends ClientChannelBase {
}
main() async {
test('client reconnects after the connection gets old', () async {
testTcpAndUds('client reconnects after the connection gets old',
(address) async {
// client reconnect after a short delay.
final grpc.Server server = grpc.Server([TestService()]);
await server.serve(address: 'localhost', port: 0);
await server.serve(address: address, port: 0);
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
address,
server.port,
grpc.ChannelOptions(
idleTimeout: Duration(minutes: 1),
@ -61,8 +65,8 @@ main() async {
credentials: grpc.ChannelCredentials.insecure(),
),
));
final testClient = TestClient(channel);
final testClient = TestClient(channel);
expect(await testClient.stream(1).toList(), [1, 2, 3]);
await Future.delayed(Duration(milliseconds: 200));
expect(await testClient.stream(1).toList(), [1, 2, 3]);
@ -71,15 +75,16 @@ main() async {
server.shutdown();
});
test('client reconnects when stream limit is used', () async {
testTcpAndUds('client reconnects when stream limit is used', (address) async {
// client reconnect after setting stream limit.
final grpc.Server server = grpc.Server([TestService()]);
await server.serve(
address: 'localhost',
address: address,
port: 0,
http2ServerSettings: ServerSettings(concurrentStreamLimit: 2));
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
address,
server.port,
grpc.ChannelOptions(credentials: grpc.ChannelCredentials.insecure())));
final states = <grpc.ConnectionState>[];

39
test/common.dart Normal file
View File

@ -0,0 +1,39 @@
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:io';
import 'dart:async';
import 'package:test/test.dart';
/// Test functionality for both TCP and Unix domain sockets.
void testTcpAndUds(
String name, FutureOr<void> Function(InternetAddress) testCase,
{String host = 'localhost'}) {
test(name, () async {
final address = await InternetAddress.lookup(host);
await testCase(address.first);
});
if (Platform.isWindows) {
return;
}
test('$name (over uds)', () async {
final tempDir = await Directory.systemTemp.createTemp();
final address = InternetAddress(tempDir.path + '/socket',
type: InternetAddressType.unix);
addTearDown(() => tempDir.delete(recursive: true));
await testCase(address);
});
}

View File

@ -7,6 +7,7 @@ import 'package:grpc/src/client/channel.dart' hide ClientChannel;
import 'package:grpc/src/client/connection.dart';
import 'package:grpc/src/client/http2_connection.dart';
import 'package:test/test.dart';
import 'common.dart';
class TestClient extends Client {
static final _$stream = ClientMethod<int, int>('/test.TestService/stream',
@ -56,12 +57,13 @@ class FixedConnectionClientChannel extends ClientChannelBase {
}
main() async {
test('round trip insecure connection', () async {
testTcpAndUds('round trip insecure connection', (address) async {
// round trip test of insecure connection.
final Server server = Server([TestService()]);
await server.serve(address: 'localhost', port: 0);
await server.serve(address: address, port: 0);
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
address,
server.port,
ChannelOptions(credentials: ChannelCredentials.insecure()),
));
@ -71,17 +73,18 @@ main() async {
server.shutdown();
});
test('round trip secure connection', () async {
testTcpAndUds('round trip secure connection', (address) async {
// round trip test of secure connection.
final Server server = Server([TestService()]);
await server.serve(
address: 'localhost',
address: address,
port: 0,
security: ServerTlsCredentials(
certificate: File('test/data/localhost.crt').readAsBytesSync(),
privateKey: File('test/data/localhost.key').readAsBytesSync()));
final channel = FixedConnectionClientChannel(Http2ClientConnection(
'localhost',
address,
server.port,
ChannelOptions(
credentials: ChannelCredentials.secure(

View File

@ -1,8 +1,10 @@
@TestOn('vm')
import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'package:grpc/grpc.dart' as grpc;
import 'package:test/test.dart';
import 'common.dart';
class TestClient extends grpc.Client {
static final _$infiniteStream = grpc.ClientMethod<int, int>(
@ -45,14 +47,15 @@ class TestService extends grpc.Service {
}
class ClientData {
final InternetAddress address;
final int port;
final SendPort sendPort;
ClientData({this.port, this.sendPort});
ClientData({this.address, this.port, this.sendPort});
}
void client(clientData) async {
final channel = grpc.ClientChannel(
'localhost',
clientData.address,
port: clientData.port,
options: const grpc.ChannelOptions(
credentials: grpc.ChannelCredentials.insecure(),
@ -66,21 +69,28 @@ void client(clientData) async {
}
main() async {
test("the client interrupting the connection does not crash the server",
() async {
testTcpAndUds(
"the client interrupting the connection does not crash the server",
(address) async {
// interrrupt the connect of client, the server does not crash.
grpc.Server server;
server = grpc.Server([
TestService(
finallyCallback: expectAsync0(() {
server.shutdown();
expect(server.shutdown(), completes);
}, reason: 'the producer should get cancelled'))
]);
await server.serve(address: 'localhost', port: 0);
await server.serve(address: address, port: 0);
final receivePort = ReceivePort();
Isolate.spawn(
client, ClientData(port: server.port, sendPort: receivePort.sendPort));
client,
ClientData(
address: address,
port: server.port,
sendPort: receivePort.sendPort));
receivePort.listen(expectAsync1((e) {
expect(e, isA<grpc.GrpcError>());
receivePort.close();
}, reason: 'the client should send an error from the destroyed channel'));
});
}