From c48af638a54225fe39200df726440ac7bde4225c Mon Sep 17 00:00:00 2001 From: Ankur Jain Date: Fri, 11 Dec 2020 00:52:24 -0800 Subject: [PATCH] Support message compression (#409) * Added support for compression/decompression, which can be configured through `ChannelOptions` constructor's `codecRegistry` parameter or adding the `grpc-accept-encoding` to `metadata` parameter of `CallOptions` on the client side and `codecRegistry` parameter to `Server` on the server side. Outgoing rpc can be compressed using the `compression` parameter on the `CallOptions`. Closes #6 --- CHANGELOG.md | 9 +++ example/helloworld/bin/client.dart | 12 +++- example/helloworld/bin/server.dart | 7 +- lib/grpc.dart | 6 +- lib/src/client/call.dart | 46 ++++++++++--- lib/src/client/http2_connection.dart | 54 ++++++++++----- lib/src/client/options.dart | 4 ++ lib/src/client/transport/http2_transport.dart | 14 ++-- lib/src/server/handler.dart | 36 +++++++--- lib/src/server/server.dart | 26 ++++--- lib/src/shared/codec.dart | 68 +++++++++++++++++++ lib/src/shared/codec_registry.dart | 50 ++++++++++++++ lib/src/shared/message.dart | 47 +++++++++---- pubspec.yaml | 3 +- test/client_tests/client_test.dart | 24 +++++++ test/round_trip_test.dart | 26 +++++++ test/shared_tests/codec_registry_test.dart | 15 ++++ test/src/client_utils.dart | 6 +- test/src/server_utils.dart | 5 +- test/src/utils.dart | 3 +- 20 files changed, 377 insertions(+), 84 deletions(-) create mode 100644 lib/src/shared/codec.dart create mode 100644 lib/src/shared/codec_registry.dart create mode 100644 test/shared_tests/codec_registry_test.dart diff --git a/CHANGELOG.md b/CHANGELOG.md index 94290a9..b3228f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## 2.9.0-dev + +* Added support for compression/decompression, which can be configured through + `ChannelOptions` constructor's `codecRegistry` parameter or adding the + `grpc-accept-encoding` to `metadata` parameter of `CallOptions` on the client + side and `codecRegistry` parameter to `Server` on the server side. + Outgoing rpc can be compressed using the `compression` parameter on the + `CallOptions`. + ## 2.8.0 * Added support for client interceptors, which can be configured through diff --git a/example/helloworld/bin/client.dart b/example/helloworld/bin/client.dart index 8511293..9c023a4 100644 --- a/example/helloworld/bin/client.dart +++ b/example/helloworld/bin/client.dart @@ -15,7 +15,6 @@ /// Dart implementation of the gRPC helloworld.Greeter client. import 'package:grpc/grpc.dart'; - import 'package:helloworld/src/generated/helloworld.pb.dart'; import 'package:helloworld/src/generated/helloworld.pbgrpc.dart'; @@ -23,14 +22,21 @@ Future main(List args) async { final channel = ClientChannel( 'localhost', port: 50051, - options: const ChannelOptions(credentials: ChannelCredentials.insecure()), + options: ChannelOptions( + credentials: ChannelCredentials.insecure(), + codecRegistry: + CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), + ), ); final stub = GreeterClient(channel); final name = args.isNotEmpty ? args[0] : 'world'; try { - final response = await stub.sayHello(HelloRequest()..name = name); + final response = await stub.sayHello( + HelloRequest()..name = name, + options: CallOptions(compression: const GzipCodec()), + ); print('Greeter client received: ${response.message}'); } catch (e) { print('Caught error: $e'); diff --git a/example/helloworld/bin/server.dart b/example/helloworld/bin/server.dart index 145b0a3..bc8afea 100644 --- a/example/helloworld/bin/server.dart +++ b/example/helloworld/bin/server.dart @@ -15,7 +15,6 @@ /// Dart implementation of the gRPC helloworld.Greeter server. import 'package:grpc/grpc.dart'; - import 'package:helloworld/src/generated/helloworld.pb.dart'; import 'package:helloworld/src/generated/helloworld.pbgrpc.dart'; @@ -27,7 +26,11 @@ class GreeterService extends GreeterServiceBase { } Future main(List args) async { - final server = Server([GreeterService()]); + final server = Server( + [GreeterService()], + const [], + CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]), + ); await server.serve(port: 50051); print('Server listening on port ${server.port}...'); } diff --git a/lib/grpc.dart b/lib/grpc.dart index 396386e..f927483 100644 --- a/lib/grpc.dart +++ b/lib/grpc.dart @@ -31,10 +31,8 @@ export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture; export 'src/client/connection.dart' show ConnectionState; export 'src/client/http2_channel.dart' show ClientChannel, ClientTransportConnectorChannel; - export 'src/client/interceptor.dart' show ClientInterceptor, ClientUnaryInvoker, ClientStreamingInvoker; - export 'src/client/method.dart' show ClientMethod; export 'src/client/options.dart' show @@ -57,11 +55,11 @@ export 'src/server/server.dart' ConnectionServer, Server; export 'src/server/service.dart' show ServiceMethod, Service; +export 'src/shared/codec.dart' show Codec, IdentityCodec, GzipCodec; +export 'src/shared/codec_registry.dart'; export 'src/shared/message.dart' show GrpcMessage, GrpcMetadata, GrpcData, grpcDecompressor; - export 'src/shared/profiler.dart' show isTimelineLoggingEnabled; - export 'src/shared/security.dart' show supportedAlpnProtocols, createSecurityContext; export 'src/shared/status.dart' show StatusCode, GrpcError; diff --git a/lib/src/client/call.dart b/lib/src/client/call.dart index a685c43..ba93bab 100644 --- a/lib/src/client/call.dart +++ b/lib/src/client/call.dart @@ -17,6 +17,7 @@ import 'dart:async'; import 'dart:convert'; import 'dart:developer'; +import 'package:grpc/grpc.dart'; import 'package:grpc/src/generated/google/rpc/status.pb.dart'; import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart'; @@ -34,6 +35,7 @@ const _reservedHeaders = [ 'te', 'grpc-timeout', 'grpc-accept-encoding', + 'grpc-encoding', 'user-agent', ]; @@ -55,8 +57,14 @@ class CallOptions { final Map metadata; final Duration timeout; final List metadataProviders; + final Codec compression; - CallOptions._(this.metadata, this.timeout, this.metadataProviders); + CallOptions._( + this.metadata, + this.timeout, + this.metadataProviders, + this.compression, + ); /// Creates a [CallOptions] object. /// @@ -64,12 +72,18 @@ class CallOptions { /// configure per-RPC metadata [providers]. The metadata [providers] are /// invoked in order for every RPC, and can modify the outgoing metadata /// (including metadata provided by previous providers). - factory CallOptions( - {Map metadata, - Duration timeout, - List providers}) { - return CallOptions._(Map.unmodifiable(metadata ?? {}), timeout, - List.unmodifiable(providers ?? [])); + factory CallOptions({ + Map metadata, + Duration timeout, + List providers, + Codec compression, + }) { + return CallOptions._( + Map.unmodifiable(metadata ?? {}), + timeout, + List.unmodifiable(providers ?? []), + compression, + ); } factory CallOptions.from(Iterable options) => @@ -81,8 +95,13 @@ class CallOptions { final mergedTimeout = other.timeout ?? timeout; final mergedProviders = List.from(metadataProviders) ..addAll(other.metadataProviders); - return CallOptions._(Map.unmodifiable(mergedMetadata), mergedTimeout, - List.unmodifiable(mergedProviders)); + final mergedCompression = other.compression ?? compression; + return CallOptions._( + Map.unmodifiable(mergedMetadata), + mergedTimeout, + List.unmodifiable(mergedProviders), + mergedCompression, + ); } } @@ -109,7 +128,7 @@ class WebCallOptions extends CallOptions { WebCallOptions._(Map metadata, Duration timeout, List metadataProviders, {this.bypassCorsPreflight, this.withCredentials}) - : super._(metadata, timeout, metadataProviders); + : super._(metadata, timeout, metadataProviders, null); /// Creates a [WebCallOptions] object. /// @@ -246,7 +265,12 @@ class ClientCall implements Response { void _sendRequest(ClientConnection connection, Map metadata) { try { _stream = connection.makeRequest( - _method.path, options.timeout, metadata, _onRequestError); + _method.path, + options.timeout, + metadata, + _onRequestError, + callOptions: options, + ); } catch (e) { _terminateWithError(GrpcError.unavailable('Error making call: $e')); return; diff --git a/lib/src/client/http2_connection.dart b/lib/src/client/http2_connection.dart index 7ebf845..3be2852 100644 --- a/lib/src/client/http2_connection.dart +++ b/lib/src/client/http2_connection.dart @@ -20,13 +20,12 @@ import 'dart:io'; import 'package:http2/transport.dart'; import 'package:meta/meta.dart'; +import '../shared/codec.dart'; import '../shared/timeout.dart'; - import 'call.dart'; import 'client_transport_connector.dart'; import 'connection.dart' hide ClientConnection; import 'connection.dart' as connection; - import 'options.dart'; import 'transport/http2_credentials.dart'; import 'transport/http2_transport.dart'; @@ -39,8 +38,6 @@ class Http2ClientConnection implements connection.ClientConnection { static final _contentTypeGrpc = Header.ascii('content-type', 'application/grpc'); static final _teTrailers = Header.ascii('te', 'trailers'); - static final _grpcAcceptEncoding = - Header.ascii('grpc-accept-encoding', 'identity'); final ChannelOptions options; @@ -155,11 +152,25 @@ class Http2ClientConnection implements connection.ClientConnection { GrpcTransportStream makeRequest(String path, Duration timeout, Map metadata, ErrorHandler onRequestFailure, {CallOptions callOptions}) { - final headers = createCallHeaders(credentials.isSecure, - _transportConnector.authority, path, timeout, metadata, - userAgent: options.userAgent); + final compressionCodec = callOptions.compression; + final headers = createCallHeaders( + credentials.isSecure, + _transportConnector.authority, + path, + timeout, + metadata, + compressionCodec, + userAgent: options.userAgent, + grpcAcceptEncodings: callOptions.metadata['grpc-accept-encoding'] ?? + options.codecRegistry?.supportedEncodings, + ); final stream = _transportConnection.makeRequest(headers); - return Http2TransportStream(stream, onRequestFailure); + return Http2TransportStream( + stream, + onRequestFailure, + options.codecRegistry, + compressionCodec, + ); } void _startCall(ClientCall call) { @@ -272,24 +283,31 @@ class Http2ClientConnection implements connection.ClientConnection { _timer = Timer(_currentReconnectDelay, _handleReconnect); } - static List
createCallHeaders(bool useTls, String authority, - String path, Duration timeout, Map metadata, - {String userAgent}) { + static List
createCallHeaders( + bool useTls, + String authority, + String path, + Duration timeout, + Map metadata, + Codec compressionCodec, { + String userAgent, + String grpcAcceptEncodings, + }) { final headers = [ _methodPost, useTls ? _schemeHttps : _schemeHttp, Header(ascii.encode(':path'), utf8.encode(path)), Header(ascii.encode(':authority'), utf8.encode(authority)), - ]; - if (timeout != null) { - headers.add(Header.ascii('grpc-timeout', toTimeoutString(timeout))); - } - headers.addAll([ + if (timeout != null) + Header.ascii('grpc-timeout', toTimeoutString(timeout)), _contentTypeGrpc, _teTrailers, - _grpcAcceptEncoding, Header.ascii('user-agent', userAgent ?? defaultUserAgent), - ]); + if (grpcAcceptEncodings != null) + Header.ascii('grpc-accept-encoding', grpcAcceptEncodings), + if (compressionCodec != null) + Header.ascii('grpc-encoding', compressionCodec.encodingName) + ]; metadata?.forEach((key, value) { headers.add(Header(ascii.encode(key), utf8.encode(value))); }); diff --git a/lib/src/client/options.dart b/lib/src/client/options.dart index 2700ee7..2791948 100644 --- a/lib/src/client/options.dart +++ b/lib/src/client/options.dart @@ -14,6 +14,8 @@ // limitations under the License. import 'dart:math'; + +import '../shared/codec_registry.dart'; import 'transport/http2_credentials.dart'; const defaultIdleTimeout = Duration(minutes: 5); @@ -44,6 +46,7 @@ Duration defaultBackoffStrategy(Duration lastBackoff) { class ChannelOptions { final ChannelCredentials credentials; final Duration idleTimeout; + final CodecRegistry codecRegistry; /// The maximum time a single connection will be used for new requests. final Duration connectionTimeout; @@ -56,5 +59,6 @@ class ChannelOptions { this.userAgent = defaultUserAgent, this.backoffStrategy = defaultBackoffStrategy, this.connectionTimeout = defaultConnectionTimeOut, + this.codecRegistry, }); } diff --git a/lib/src/client/transport/http2_transport.dart b/lib/src/client/transport/http2_transport.dart index b7cf8e8..5ecda17 100644 --- a/lib/src/client/transport/http2_transport.dart +++ b/lib/src/client/transport/http2_transport.dart @@ -15,11 +15,11 @@ import 'dart:async'; +import 'package:grpc/grpc.dart'; import 'package:http2/transport.dart'; import '../../shared/message.dart'; import '../../shared/streams.dart'; - import 'transport.dart'; class Http2TransportStream extends GrpcTransportStream { @@ -30,12 +30,16 @@ class Http2TransportStream extends GrpcTransportStream { StreamSink> get outgoingMessages => _outgoingMessages.sink; - Http2TransportStream(this._transportStream, this._onError) - : incomingMessages = _transportStream.incomingMessages + Http2TransportStream( + this._transportStream, + this._onError, + CodecRegistry codecRegistry, + Codec compression, + ) : incomingMessages = _transportStream.incomingMessages .transform(GrpcHttpDecoder()) - .transform(grpcDecompressor()) { + .transform(grpcDecompressor(codecRegistry: codecRegistry)) { _outgoingMessages.stream - .map(frame) + .map((payload) => frame(payload, compression)) .map((bytes) => DataStreamMessage(bytes)) .handleError(_onError) .listen(_transportStream.outgoingMessages.add, diff --git a/lib/src/server/handler.dart b/lib/src/server/handler.dart index 5de4bbe..27a6a79 100644 --- a/lib/src/server/handler.dart +++ b/lib/src/server/handler.dart @@ -18,11 +18,12 @@ import 'dart:convert'; import 'package:http2/transport.dart'; +import '../shared/codec.dart'; +import '../shared/codec_registry.dart'; import '../shared/message.dart'; import '../shared/status.dart'; import '../shared/streams.dart'; import '../shared/timeout.dart'; - import 'call.dart'; import 'interceptor.dart'; import 'service.dart'; @@ -32,6 +33,7 @@ class ServerHandler_ extends ServiceCall { final ServerTransportStream _stream; final Service Function(String service) _serviceLookup; final List _interceptors; + final CodecRegistry _codecRegistry; StreamSubscription _incomingSubscription; @@ -39,6 +41,7 @@ class ServerHandler_ extends ServiceCall { ServiceMethod _descriptor; Map _clientMetadata; + Codec _callEncodingCodec; StreamController _requests; bool _hasReceivedRequest = false; @@ -55,7 +58,12 @@ class ServerHandler_ extends ServiceCall { bool _isTimedOut = false; Timer _timeoutTimer; - ServerHandler_(this._serviceLookup, this._stream, this._interceptors); + ServerHandler_( + this._serviceLookup, + this._stream, + this._interceptors, + this._codecRegistry, + ); DateTime get deadline => _deadline; @@ -74,7 +82,7 @@ class ServerHandler_ extends ServiceCall { _incomingSubscription = _stream.incomingMessages .transform(GrpcHttpDecoder()) - .transform(grpcDecompressor()) + .transform(grpcDecompressor(codecRegistry: _codecRegistry)) .listen(_onDataIdle, onError: _onError, onDone: _onDoneError, cancelOnError: true); _stream.outgoingMessages.done.then((_) { @@ -112,6 +120,13 @@ class ServerHandler_ extends ServiceCall { } final serviceName = pathSegments[1]; final methodName = pathSegments[2]; + if (_codecRegistry != null) { + final acceptedEncodings = + clientMetadata['grpc-accept-encoding']?.split(',') ?? []; + _callEncodingCodec = acceptedEncodings + .map(_codecRegistry.lookup) + .firstWhere((c) => c != null, orElse: () => null); + } _service = _serviceLookup(serviceName); _descriptor = _service?.$lookupMethod(methodName); @@ -251,7 +266,7 @@ class ServerHandler_ extends ServiceCall { if (!_headersSent) { sendHeaders(); } - _stream.sendData(frame(bytes)); + _stream.sendData(frame(bytes, _callEncodingCodec)); } catch (error) { final grpcError = GrpcError.internal('Error sending response: $error'); if (!_requests.isClosed) { @@ -285,7 +300,9 @@ class ServerHandler_ extends ServiceCall { // TODO(jakobr): Should come from package:http2? final outgoingHeadersMap = { ':status': '200', - 'content-type': 'application/grpc' + 'content-type': 'application/grpc', + if (_callEncodingCodec != null) + 'grpc-encoding': _callEncodingCodec.encodingName, }; outgoingHeadersMap.addAll(_customHeaders); @@ -384,7 +401,10 @@ class ServerHandler_ extends ServiceCall { } class ServerHandler extends ServerHandler_ { - ServerHandler(Service Function(String service) serviceLookup, stream, - [List interceptors = const []]) - : super(serviceLookup, stream, interceptors); + ServerHandler( + Service Function(String service) serviceLookup, + stream, [ + List interceptors = const [], + CodecRegistry codecRegistry, + ]) : super(serviceLookup, stream, interceptors, codecRegistry); } diff --git a/lib/src/server/server.dart b/lib/src/server/server.dart index 265ac52..dd5d95f 100644 --- a/lib/src/server/server.dart +++ b/lib/src/server/server.dart @@ -16,11 +16,11 @@ import 'dart:async'; import 'dart:io'; +import 'package:grpc/grpc.dart'; import 'package:http2/transport.dart'; import 'package:meta/meta.dart'; import '../shared/security.dart'; - import 'handler.dart'; import 'interceptor.dart'; import 'service.dart'; @@ -84,13 +84,17 @@ class ServerTlsCredentials extends ServerCredentials { class ConnectionServer { final Map _services = {}; final List _interceptors; + final CodecRegistry _codecRegistry; final _connections = []; /// Create a server for the given [services]. - ConnectionServer(List services, - [List interceptors = const []]) - : _interceptors = interceptors { + ConnectionServer( + List services, [ + List interceptors = const [], + CodecRegistry codecRegistry, + ]) : _codecRegistry = codecRegistry, + _interceptors = interceptors { for (final service in services) { _services[service.$name] = service; } @@ -121,7 +125,8 @@ class ConnectionServer { @visibleForTesting ServerHandler_ serveStream_(ServerTransportStream stream) { - return ServerHandler_(lookupService, stream, _interceptors)..handle(); + return ServerHandler_(lookupService, stream, _interceptors, _codecRegistry) + ..handle(); } } @@ -133,9 +138,11 @@ class Server extends ConnectionServer { SecureServerSocket _secureServer; /// Create a server for the given [services]. - Server(List services, - [List interceptors = const []]) - : super(services, interceptors); + Server( + List services, [ + List interceptors = const [], + CodecRegistry codecRegistry, + ]) : super(services, interceptors, codecRegistry); /// The port that the server is listening on, or `null` if the server is not /// active. @@ -193,7 +200,8 @@ class Server extends ConnectionServer { @visibleForTesting ServerHandler_ serveStream_(ServerTransportStream stream) { - return ServerHandler_(lookupService, stream, _interceptors)..handle(); + return ServerHandler_(lookupService, stream, _interceptors, _codecRegistry) + ..handle(); } @Deprecated( diff --git a/lib/src/shared/codec.dart b/lib/src/shared/codec.dart new file mode 100644 index 0000000..c9566bf --- /dev/null +++ b/lib/src/shared/codec.dart @@ -0,0 +1,68 @@ +// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'package:archive/archive.dart'; + +abstract class Codec { + /// Returns the message encoding that this compressor uses. + /// + /// This can be values such as "gzip", "deflate", "snappy", etc. + String get encodingName; + + /// Wraps an existing output stream with a compressed output. + List compress(List data); + + /// Wraps an existing output stream with a uncompressed input data. + List decompress(List data); +} + +/// The "identity", or "none" codec. +/// +/// This codec is special in that it can be used to explicitly disable Call +/// compression on a Channel that by default compresses. +class IdentityCodec implements Codec { + const IdentityCodec(); + + @override + String get encodingName => "identity"; + + @override + List compress(List data) { + return data; + } + + @override + List decompress(List data) { + return data; + } +} + +/// A gzip compressor and decompressor. +class GzipCodec implements Codec { + const GzipCodec(); + + @override + String get encodingName => "gzip"; + + @override + List compress(List data) { + return GZipEncoder().encode(data); + } + + @override + List decompress(List data) { + return GZipDecoder().decodeBytes(data); + } +} diff --git a/lib/src/shared/codec_registry.dart b/lib/src/shared/codec_registry.dart new file mode 100644 index 0000000..368348c --- /dev/null +++ b/lib/src/shared/codec_registry.dart @@ -0,0 +1,50 @@ +// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'codec.dart'; + +/// Encloses classes related to the compression and decompression of messages. +class CodecRegistry { + CodecRegistry({List codecs = const [IdentityCodec()]}) + : assert(codecs != null), + _codecs = Map.fromIterable(codecs, key: (c) => c.encodingName), + _supportedEncodings = codecs.map((c) { + if (c.encodingName.contains(',')) { + throw ArgumentError.value(c.encodingName, 'codecs', + 'contains entries with names containing ","'); + } + return c.encodingName; + }).join(',') { + if (_codecs.length != codecs.length) { + throw ArgumentError.value( + codecs, 'codecs', 'contains multiple entries with the same name'); + } + } + + factory CodecRegistry.empty() { + return CodecRegistry(codecs: []); + } + + /// Key refers to the `encodingName` param from the [Codec]. + final Map _codecs; + + final String _supportedEncodings; + + Codec lookup(String codecName) { + return _codecs[codecName]; + } + + String get supportedEncodings => _supportedEncodings; +} diff --git a/lib/src/shared/message.dart b/lib/src/shared/message.dart index 9a4f199..dcf251b 100644 --- a/lib/src/shared/message.dart +++ b/lib/src/shared/message.dart @@ -16,6 +16,10 @@ import 'dart:async'; import 'dart:typed_data'; +import 'codec.dart'; +import 'codec_registry.dart'; +import 'status.dart'; + abstract class GrpcMessage {} class GrpcMetadata extends GrpcMessage { @@ -54,25 +58,38 @@ class GrpcMessageSink extends Sink { } } -List frame(List payload) { - final payloadLength = payload.length; +List frame(List rawPayload, [Codec codec]) { + final compressedPayload = + codec == null ? rawPayload : codec.compress(rawPayload); + final payloadLength = compressedPayload.length; final bytes = Uint8List(payloadLength + 5); final header = bytes.buffer.asByteData(0, 5); - header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression + header.setUint8(0, codec == null ? 0 : 1); header.setUint32(1, payloadLength); - bytes.setRange(5, bytes.length, payload); + bytes.setRange(5, bytes.length, compressedPayload); return bytes; } -StreamTransformer grpcDecompressor() => - StreamTransformer.fromHandlers( - handleData: (GrpcMessage value, EventSink sink) { - if (value is GrpcData) { - if (value.isCompressed) { - // TODO(dart-lang/grpc-dart#6): Actually handle decompression. - sink.add(GrpcData(value.data, isCompressed: false)); - return; - } +StreamTransformer grpcDecompressor({ + CodecRegistry codecRegistry, +}) { + Codec codec; + return StreamTransformer.fromHandlers( + handleData: (GrpcMessage value, EventSink sink) { + if (value is GrpcData && value.isCompressed) { + if (codec == null) { + sink.addError( + GrpcError.unimplemented('Compression mechanism not supported'), + ); + return; } - sink.add(value); - }); + sink.add(GrpcData(codec.decompress(value.data), isCompressed: false)); + return; + } + + if (value is GrpcMetadata && value.metadata.containsKey('grpc-encoding')) { + codec = codecRegistry?.lookup(value.metadata['grpc-encoding']); + } + sink.add(value); + }); +} diff --git a/pubspec.yaml b/pubspec.yaml index 34b397a..dfae6cf 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -1,7 +1,7 @@ name: grpc description: Dart implementation of gRPC, a high performance, open-source universal RPC framework. -version: 2.8.0 +version: 2.9.0-dev homepage: https://github.com/dart-lang/grpc-dart @@ -9,6 +9,7 @@ environment: sdk: '>=2.8.0 <3.0.0' dependencies: + archive: ^2.0.13 async: ^2.2.0 crypto: ^2.1.4 fixnum: ^0.10.11 diff --git a/test/client_tests/client_test.dart b/test/client_tests/client_test.dart index 1bb4753..b46feac 100644 --- a/test/client_tests/client_test.dart +++ b/test/client_tests/client_test.dart @@ -63,6 +63,30 @@ void main() { ); }); + test('Unary call attaches encoding headers', () async { + const requestValue = 17; + const responseValue = 19; + + void handleRequest(StreamMessage message) { + final data = validateDataMessage(message); + expect(mockDecode(data.data), requestValue); + + harness + ..sendResponseHeader() + ..sendResponseValue(responseValue) + ..sendResponseTrailer(); + } + + await harness.runTest( + clientCall: harness.client.unary(requestValue, + options: CallOptions(metadata: {'grpc-accept-encoding': 'gzip'})), + expectedResult: responseValue, + expectedCustomHeaders: {'grpc-accept-encoding': 'gzip'}, + expectedPath: '/Test/Unary', + serverHandlers: [handleRequest], + ); + }); + test('Client-streaming calls work on the client', () async { const requests = [17, 3]; const response = 12; diff --git a/test/round_trip_test.dart b/test/round_trip_test.dart index 3afb1cc..5c07d96 100644 --- a/test/round_trip_test.dart +++ b/test/round_trip_test.dart @@ -1,12 +1,14 @@ @TestOn('vm') import 'dart:async'; import 'dart:io'; + import 'package:grpc/grpc.dart'; import 'package:grpc/service_api.dart' as api; import 'package:grpc/src/client/channel.dart' hide ClientChannel; import 'package:grpc/src/client/connection.dart'; import 'package:grpc/src/client/http2_connection.dart'; import 'package:test/test.dart'; + import 'common.dart'; class TestClient extends Client { @@ -73,6 +75,30 @@ main() async { server.shutdown(); }); + testTcpAndUds('round trip with outgoing and incoming compression', + (address) async { + final Server server = Server( + [TestService()], const [], CodecRegistry(codecs: const [GzipCodec()])); + await server.serve(address: address, port: 0); + + final channel = FixedConnectionClientChannel(Http2ClientConnection( + address, + server.port, + ChannelOptions( + credentials: ChannelCredentials.insecure(), + codecRegistry: CodecRegistry(codecs: const [GzipCodec()]), + ), + )); + final testClient = TestClient(channel); + expect( + await testClient + .stream(TestService.requestFiniteStream, + options: CallOptions(compression: const GzipCodec())) + .toList(), + [1, 2, 3]); + await server.shutdown(); + }); + testTcpAndUds('round trip secure connection', (address) async { // round trip test of secure connection. final Server server = Server([TestService()]); diff --git a/test/shared_tests/codec_registry_test.dart b/test/shared_tests/codec_registry_test.dart new file mode 100644 index 0000000..5a5483b --- /dev/null +++ b/test/shared_tests/codec_registry_test.dart @@ -0,0 +1,15 @@ +import 'package:grpc/src/shared/codec.dart'; +import 'package:grpc/src/shared/codec_registry.dart'; +import 'package:test/test.dart'; + +void main() { + test('CodecRegistry register adds new encodings', () { + final registry = CodecRegistry(); + expect(registry.supportedEncodings, 'identity'); + }); + + test('CodecRegistry lookup', () { + final registry = CodecRegistry(); + expect(registry.lookup('identity'), const IdentityCodec()); + }); +} diff --git a/test/src/client_utils.dart b/test/src/client_utils.dart index f978b96..6597247 100644 --- a/test/src/client_utils.dart +++ b/test/src/client_utils.dart @@ -16,14 +16,13 @@ import 'dart:async'; import 'dart:convert'; +import 'package:grpc/grpc.dart'; import 'package:grpc/src/client/channel.dart' as base; import 'package:grpc/src/client/http2_connection.dart'; import 'package:grpc/src/shared/message.dart'; import 'package:http2/transport.dart'; -import 'package:test/test.dart'; import 'package:mockito/mockito.dart'; - -import 'package:grpc/grpc.dart'; +import 'package:test/test.dart'; import 'utils.dart'; @@ -69,6 +68,7 @@ class FakeChannelOptions implements ChannelOptions { Duration connectionTimeout = const Duration(seconds: 10); String userAgent = 'dart-grpc/1.0.0 test'; BackoffStrategy backoffStrategy = testBackoff; + CodecRegistry codecRegistry = CodecRegistry.empty(); } class FakeChannel extends ClientChannel { diff --git a/test/src/server_utils.dart b/test/src/server_utils.dart index 691d85b..546a2bd 100644 --- a/test/src/server_utils.dart +++ b/test/src/server_utils.dart @@ -15,13 +15,12 @@ import 'dart:async'; +import 'package:grpc/grpc.dart'; import 'package:grpc/src/client/http2_connection.dart'; import 'package:grpc/src/shared/message.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; -import 'package:grpc/grpc.dart'; - import 'utils.dart'; class TestService extends Service { @@ -189,7 +188,7 @@ abstract class _Harness { Map metadata, Duration timeout}) { final headers = Http2ClientConnection.createCallHeaders( - true, authority, path, timeout, metadata, + true, authority, path, timeout, metadata, null, userAgent: 'dart-grpc/1.0.0 test'); toServer.add(HeadersStreamMessage(headers)); } diff --git a/test/src/utils.dart b/test/src/utils.dart index c55d32e..2d833cb 100644 --- a/test/src/utils.dart +++ b/test/src/utils.dart @@ -15,8 +15,8 @@ import 'dart:convert'; -import 'package:grpc/src/shared/streams.dart'; import 'package:grpc/src/shared/message.dart'; +import 'package:grpc/src/shared/streams.dart'; import 'package:http2/transport.dart'; import 'package:test/test.dart'; @@ -44,7 +44,6 @@ void validateRequestHeaders(Map headers, expect(headers['grpc-timeout'], timeout); expect(headers['content-type'], 'application/grpc'); expect(headers['te'], 'trailers'); - expect(headers['grpc-accept-encoding'], 'identity'); expect(headers['user-agent'], startsWith('dart-grpc/')); customHeaders?.forEach((key, value) {