mirror of https://github.com/grpc/grpc-dart.git
Support message compression (#409)
* Added support for compression/decompression, which can be configured through `ChannelOptions` constructor's `codecRegistry` parameter or adding the `grpc-accept-encoding` to `metadata` parameter of `CallOptions` on the client side and `codecRegistry` parameter to `Server` on the server side. Outgoing rpc can be compressed using the `compression` parameter on the `CallOptions`. Closes #6
This commit is contained in:
parent
e51c5a3d5d
commit
c48af638a5
|
@ -1,3 +1,12 @@
|
|||
## 2.9.0-dev
|
||||
|
||||
* Added support for compression/decompression, which can be configured through
|
||||
`ChannelOptions` constructor's `codecRegistry` parameter or adding the
|
||||
`grpc-accept-encoding` to `metadata` parameter of `CallOptions` on the client
|
||||
side and `codecRegistry` parameter to `Server` on the server side.
|
||||
Outgoing rpc can be compressed using the `compression` parameter on the
|
||||
`CallOptions`.
|
||||
|
||||
## 2.8.0
|
||||
|
||||
* Added support for client interceptors, which can be configured through
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
/// Dart implementation of the gRPC helloworld.Greeter client.
|
||||
import 'package:grpc/grpc.dart';
|
||||
|
||||
import 'package:helloworld/src/generated/helloworld.pb.dart';
|
||||
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
|
||||
|
||||
|
@ -23,14 +22,21 @@ Future<void> main(List<String> args) async {
|
|||
final channel = ClientChannel(
|
||||
'localhost',
|
||||
port: 50051,
|
||||
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
|
||||
options: ChannelOptions(
|
||||
credentials: ChannelCredentials.insecure(),
|
||||
codecRegistry:
|
||||
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
|
||||
),
|
||||
);
|
||||
final stub = GreeterClient(channel);
|
||||
|
||||
final name = args.isNotEmpty ? args[0] : 'world';
|
||||
|
||||
try {
|
||||
final response = await stub.sayHello(HelloRequest()..name = name);
|
||||
final response = await stub.sayHello(
|
||||
HelloRequest()..name = name,
|
||||
options: CallOptions(compression: const GzipCodec()),
|
||||
);
|
||||
print('Greeter client received: ${response.message}');
|
||||
} catch (e) {
|
||||
print('Caught error: $e');
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
|
||||
/// Dart implementation of the gRPC helloworld.Greeter server.
|
||||
import 'package:grpc/grpc.dart';
|
||||
|
||||
import 'package:helloworld/src/generated/helloworld.pb.dart';
|
||||
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
|
||||
|
||||
|
@ -27,7 +26,11 @@ class GreeterService extends GreeterServiceBase {
|
|||
}
|
||||
|
||||
Future<void> main(List<String> args) async {
|
||||
final server = Server([GreeterService()]);
|
||||
final server = Server(
|
||||
[GreeterService()],
|
||||
const <Interceptor>[],
|
||||
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
|
||||
);
|
||||
await server.serve(port: 50051);
|
||||
print('Server listening on port ${server.port}...');
|
||||
}
|
||||
|
|
|
@ -31,10 +31,8 @@ export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
|
|||
export 'src/client/connection.dart' show ConnectionState;
|
||||
export 'src/client/http2_channel.dart'
|
||||
show ClientChannel, ClientTransportConnectorChannel;
|
||||
|
||||
export 'src/client/interceptor.dart'
|
||||
show ClientInterceptor, ClientUnaryInvoker, ClientStreamingInvoker;
|
||||
|
||||
export 'src/client/method.dart' show ClientMethod;
|
||||
export 'src/client/options.dart'
|
||||
show
|
||||
|
@ -57,11 +55,11 @@ export 'src/server/server.dart'
|
|||
ConnectionServer,
|
||||
Server;
|
||||
export 'src/server/service.dart' show ServiceMethod, Service;
|
||||
export 'src/shared/codec.dart' show Codec, IdentityCodec, GzipCodec;
|
||||
export 'src/shared/codec_registry.dart';
|
||||
export 'src/shared/message.dart'
|
||||
show GrpcMessage, GrpcMetadata, GrpcData, grpcDecompressor;
|
||||
|
||||
export 'src/shared/profiler.dart' show isTimelineLoggingEnabled;
|
||||
|
||||
export 'src/shared/security.dart'
|
||||
show supportedAlpnProtocols, createSecurityContext;
|
||||
export 'src/shared/status.dart' show StatusCode, GrpcError;
|
||||
|
|
|
@ -17,6 +17,7 @@ import 'dart:async';
|
|||
import 'dart:convert';
|
||||
import 'dart:developer';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:grpc/src/generated/google/rpc/status.pb.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
import 'package:protobuf/protobuf.dart';
|
||||
|
@ -34,6 +35,7 @@ const _reservedHeaders = [
|
|||
'te',
|
||||
'grpc-timeout',
|
||||
'grpc-accept-encoding',
|
||||
'grpc-encoding',
|
||||
'user-agent',
|
||||
];
|
||||
|
||||
|
@ -55,8 +57,14 @@ class CallOptions {
|
|||
final Map<String, String> metadata;
|
||||
final Duration timeout;
|
||||
final List<MetadataProvider> metadataProviders;
|
||||
final Codec compression;
|
||||
|
||||
CallOptions._(this.metadata, this.timeout, this.metadataProviders);
|
||||
CallOptions._(
|
||||
this.metadata,
|
||||
this.timeout,
|
||||
this.metadataProviders,
|
||||
this.compression,
|
||||
);
|
||||
|
||||
/// Creates a [CallOptions] object.
|
||||
///
|
||||
|
@ -64,12 +72,18 @@ class CallOptions {
|
|||
/// configure per-RPC metadata [providers]. The metadata [providers] are
|
||||
/// invoked in order for every RPC, and can modify the outgoing metadata
|
||||
/// (including metadata provided by previous providers).
|
||||
factory CallOptions(
|
||||
{Map<String, String> metadata,
|
||||
Duration timeout,
|
||||
List<MetadataProvider> providers}) {
|
||||
return CallOptions._(Map.unmodifiable(metadata ?? {}), timeout,
|
||||
List.unmodifiable(providers ?? []));
|
||||
factory CallOptions({
|
||||
Map<String, String> metadata,
|
||||
Duration timeout,
|
||||
List<MetadataProvider> providers,
|
||||
Codec compression,
|
||||
}) {
|
||||
return CallOptions._(
|
||||
Map.unmodifiable(metadata ?? {}),
|
||||
timeout,
|
||||
List.unmodifiable(providers ?? []),
|
||||
compression,
|
||||
);
|
||||
}
|
||||
|
||||
factory CallOptions.from(Iterable<CallOptions> options) =>
|
||||
|
@ -81,8 +95,13 @@ class CallOptions {
|
|||
final mergedTimeout = other.timeout ?? timeout;
|
||||
final mergedProviders = List.from(metadataProviders)
|
||||
..addAll(other.metadataProviders);
|
||||
return CallOptions._(Map.unmodifiable(mergedMetadata), mergedTimeout,
|
||||
List.unmodifiable(mergedProviders));
|
||||
final mergedCompression = other.compression ?? compression;
|
||||
return CallOptions._(
|
||||
Map.unmodifiable(mergedMetadata),
|
||||
mergedTimeout,
|
||||
List.unmodifiable(mergedProviders),
|
||||
mergedCompression,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,7 +128,7 @@ class WebCallOptions extends CallOptions {
|
|||
WebCallOptions._(Map<String, String> metadata, Duration timeout,
|
||||
List<MetadataProvider> metadataProviders,
|
||||
{this.bypassCorsPreflight, this.withCredentials})
|
||||
: super._(metadata, timeout, metadataProviders);
|
||||
: super._(metadata, timeout, metadataProviders, null);
|
||||
|
||||
/// Creates a [WebCallOptions] object.
|
||||
///
|
||||
|
@ -246,7 +265,12 @@ class ClientCall<Q, R> implements Response {
|
|||
void _sendRequest(ClientConnection connection, Map<String, String> metadata) {
|
||||
try {
|
||||
_stream = connection.makeRequest(
|
||||
_method.path, options.timeout, metadata, _onRequestError);
|
||||
_method.path,
|
||||
options.timeout,
|
||||
metadata,
|
||||
_onRequestError,
|
||||
callOptions: options,
|
||||
);
|
||||
} catch (e) {
|
||||
_terminateWithError(GrpcError.unavailable('Error making call: $e'));
|
||||
return;
|
||||
|
|
|
@ -20,13 +20,12 @@ import 'dart:io';
|
|||
import 'package:http2/transport.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
|
||||
import '../shared/codec.dart';
|
||||
import '../shared/timeout.dart';
|
||||
|
||||
import 'call.dart';
|
||||
import 'client_transport_connector.dart';
|
||||
import 'connection.dart' hide ClientConnection;
|
||||
import 'connection.dart' as connection;
|
||||
|
||||
import 'options.dart';
|
||||
import 'transport/http2_credentials.dart';
|
||||
import 'transport/http2_transport.dart';
|
||||
|
@ -39,8 +38,6 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
static final _contentTypeGrpc =
|
||||
Header.ascii('content-type', 'application/grpc');
|
||||
static final _teTrailers = Header.ascii('te', 'trailers');
|
||||
static final _grpcAcceptEncoding =
|
||||
Header.ascii('grpc-accept-encoding', 'identity');
|
||||
|
||||
final ChannelOptions options;
|
||||
|
||||
|
@ -155,11 +152,25 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
GrpcTransportStream makeRequest(String path, Duration timeout,
|
||||
Map<String, String> metadata, ErrorHandler onRequestFailure,
|
||||
{CallOptions callOptions}) {
|
||||
final headers = createCallHeaders(credentials.isSecure,
|
||||
_transportConnector.authority, path, timeout, metadata,
|
||||
userAgent: options.userAgent);
|
||||
final compressionCodec = callOptions.compression;
|
||||
final headers = createCallHeaders(
|
||||
credentials.isSecure,
|
||||
_transportConnector.authority,
|
||||
path,
|
||||
timeout,
|
||||
metadata,
|
||||
compressionCodec,
|
||||
userAgent: options.userAgent,
|
||||
grpcAcceptEncodings: callOptions.metadata['grpc-accept-encoding'] ??
|
||||
options.codecRegistry?.supportedEncodings,
|
||||
);
|
||||
final stream = _transportConnection.makeRequest(headers);
|
||||
return Http2TransportStream(stream, onRequestFailure);
|
||||
return Http2TransportStream(
|
||||
stream,
|
||||
onRequestFailure,
|
||||
options.codecRegistry,
|
||||
compressionCodec,
|
||||
);
|
||||
}
|
||||
|
||||
void _startCall(ClientCall call) {
|
||||
|
@ -272,24 +283,31 @@ class Http2ClientConnection implements connection.ClientConnection {
|
|||
_timer = Timer(_currentReconnectDelay, _handleReconnect);
|
||||
}
|
||||
|
||||
static List<Header> createCallHeaders(bool useTls, String authority,
|
||||
String path, Duration timeout, Map<String, String> metadata,
|
||||
{String userAgent}) {
|
||||
static List<Header> createCallHeaders(
|
||||
bool useTls,
|
||||
String authority,
|
||||
String path,
|
||||
Duration timeout,
|
||||
Map<String, String> metadata,
|
||||
Codec compressionCodec, {
|
||||
String userAgent,
|
||||
String grpcAcceptEncodings,
|
||||
}) {
|
||||
final headers = [
|
||||
_methodPost,
|
||||
useTls ? _schemeHttps : _schemeHttp,
|
||||
Header(ascii.encode(':path'), utf8.encode(path)),
|
||||
Header(ascii.encode(':authority'), utf8.encode(authority)),
|
||||
];
|
||||
if (timeout != null) {
|
||||
headers.add(Header.ascii('grpc-timeout', toTimeoutString(timeout)));
|
||||
}
|
||||
headers.addAll([
|
||||
if (timeout != null)
|
||||
Header.ascii('grpc-timeout', toTimeoutString(timeout)),
|
||||
_contentTypeGrpc,
|
||||
_teTrailers,
|
||||
_grpcAcceptEncoding,
|
||||
Header.ascii('user-agent', userAgent ?? defaultUserAgent),
|
||||
]);
|
||||
if (grpcAcceptEncodings != null)
|
||||
Header.ascii('grpc-accept-encoding', grpcAcceptEncodings),
|
||||
if (compressionCodec != null)
|
||||
Header.ascii('grpc-encoding', compressionCodec.encodingName)
|
||||
];
|
||||
metadata?.forEach((key, value) {
|
||||
headers.add(Header(ascii.encode(key), utf8.encode(value)));
|
||||
});
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
// limitations under the License.
|
||||
|
||||
import 'dart:math';
|
||||
|
||||
import '../shared/codec_registry.dart';
|
||||
import 'transport/http2_credentials.dart';
|
||||
|
||||
const defaultIdleTimeout = Duration(minutes: 5);
|
||||
|
@ -44,6 +46,7 @@ Duration defaultBackoffStrategy(Duration lastBackoff) {
|
|||
class ChannelOptions {
|
||||
final ChannelCredentials credentials;
|
||||
final Duration idleTimeout;
|
||||
final CodecRegistry codecRegistry;
|
||||
|
||||
/// The maximum time a single connection will be used for new requests.
|
||||
final Duration connectionTimeout;
|
||||
|
@ -56,5 +59,6 @@ class ChannelOptions {
|
|||
this.userAgent = defaultUserAgent,
|
||||
this.backoffStrategy = defaultBackoffStrategy,
|
||||
this.connectionTimeout = defaultConnectionTimeOut,
|
||||
this.codecRegistry,
|
||||
});
|
||||
}
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
|
||||
import 'dart:async';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:http2/transport.dart';
|
||||
|
||||
import '../../shared/message.dart';
|
||||
import '../../shared/streams.dart';
|
||||
|
||||
import 'transport.dart';
|
||||
|
||||
class Http2TransportStream extends GrpcTransportStream {
|
||||
|
@ -30,12 +30,16 @@ class Http2TransportStream extends GrpcTransportStream {
|
|||
|
||||
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
|
||||
|
||||
Http2TransportStream(this._transportStream, this._onError)
|
||||
: incomingMessages = _transportStream.incomingMessages
|
||||
Http2TransportStream(
|
||||
this._transportStream,
|
||||
this._onError,
|
||||
CodecRegistry codecRegistry,
|
||||
Codec compression,
|
||||
) : incomingMessages = _transportStream.incomingMessages
|
||||
.transform(GrpcHttpDecoder())
|
||||
.transform(grpcDecompressor()) {
|
||||
.transform(grpcDecompressor(codecRegistry: codecRegistry)) {
|
||||
_outgoingMessages.stream
|
||||
.map(frame)
|
||||
.map((payload) => frame(payload, compression))
|
||||
.map<StreamMessage>((bytes) => DataStreamMessage(bytes))
|
||||
.handleError(_onError)
|
||||
.listen(_transportStream.outgoingMessages.add,
|
||||
|
|
|
@ -18,11 +18,12 @@ import 'dart:convert';
|
|||
|
||||
import 'package:http2/transport.dart';
|
||||
|
||||
import '../shared/codec.dart';
|
||||
import '../shared/codec_registry.dart';
|
||||
import '../shared/message.dart';
|
||||
import '../shared/status.dart';
|
||||
import '../shared/streams.dart';
|
||||
import '../shared/timeout.dart';
|
||||
|
||||
import 'call.dart';
|
||||
import 'interceptor.dart';
|
||||
import 'service.dart';
|
||||
|
@ -32,6 +33,7 @@ class ServerHandler_ extends ServiceCall {
|
|||
final ServerTransportStream _stream;
|
||||
final Service Function(String service) _serviceLookup;
|
||||
final List<Interceptor> _interceptors;
|
||||
final CodecRegistry _codecRegistry;
|
||||
|
||||
StreamSubscription<GrpcMessage> _incomingSubscription;
|
||||
|
||||
|
@ -39,6 +41,7 @@ class ServerHandler_ extends ServiceCall {
|
|||
ServiceMethod _descriptor;
|
||||
|
||||
Map<String, String> _clientMetadata;
|
||||
Codec _callEncodingCodec;
|
||||
|
||||
StreamController _requests;
|
||||
bool _hasReceivedRequest = false;
|
||||
|
@ -55,7 +58,12 @@ class ServerHandler_ extends ServiceCall {
|
|||
bool _isTimedOut = false;
|
||||
Timer _timeoutTimer;
|
||||
|
||||
ServerHandler_(this._serviceLookup, this._stream, this._interceptors);
|
||||
ServerHandler_(
|
||||
this._serviceLookup,
|
||||
this._stream,
|
||||
this._interceptors,
|
||||
this._codecRegistry,
|
||||
);
|
||||
|
||||
DateTime get deadline => _deadline;
|
||||
|
||||
|
@ -74,7 +82,7 @@ class ServerHandler_ extends ServiceCall {
|
|||
|
||||
_incomingSubscription = _stream.incomingMessages
|
||||
.transform(GrpcHttpDecoder())
|
||||
.transform(grpcDecompressor())
|
||||
.transform(grpcDecompressor(codecRegistry: _codecRegistry))
|
||||
.listen(_onDataIdle,
|
||||
onError: _onError, onDone: _onDoneError, cancelOnError: true);
|
||||
_stream.outgoingMessages.done.then((_) {
|
||||
|
@ -112,6 +120,13 @@ class ServerHandler_ extends ServiceCall {
|
|||
}
|
||||
final serviceName = pathSegments[1];
|
||||
final methodName = pathSegments[2];
|
||||
if (_codecRegistry != null) {
|
||||
final acceptedEncodings =
|
||||
clientMetadata['grpc-accept-encoding']?.split(',') ?? [];
|
||||
_callEncodingCodec = acceptedEncodings
|
||||
.map(_codecRegistry.lookup)
|
||||
.firstWhere((c) => c != null, orElse: () => null);
|
||||
}
|
||||
|
||||
_service = _serviceLookup(serviceName);
|
||||
_descriptor = _service?.$lookupMethod(methodName);
|
||||
|
@ -251,7 +266,7 @@ class ServerHandler_ extends ServiceCall {
|
|||
if (!_headersSent) {
|
||||
sendHeaders();
|
||||
}
|
||||
_stream.sendData(frame(bytes));
|
||||
_stream.sendData(frame(bytes, _callEncodingCodec));
|
||||
} catch (error) {
|
||||
final grpcError = GrpcError.internal('Error sending response: $error');
|
||||
if (!_requests.isClosed) {
|
||||
|
@ -285,7 +300,9 @@ class ServerHandler_ extends ServiceCall {
|
|||
// TODO(jakobr): Should come from package:http2?
|
||||
final outgoingHeadersMap = <String, String>{
|
||||
':status': '200',
|
||||
'content-type': 'application/grpc'
|
||||
'content-type': 'application/grpc',
|
||||
if (_callEncodingCodec != null)
|
||||
'grpc-encoding': _callEncodingCodec.encodingName,
|
||||
};
|
||||
|
||||
outgoingHeadersMap.addAll(_customHeaders);
|
||||
|
@ -384,7 +401,10 @@ class ServerHandler_ extends ServiceCall {
|
|||
}
|
||||
|
||||
class ServerHandler extends ServerHandler_ {
|
||||
ServerHandler(Service Function(String service) serviceLookup, stream,
|
||||
[List<Interceptor> interceptors = const <Interceptor>[]])
|
||||
: super(serviceLookup, stream, interceptors);
|
||||
ServerHandler(
|
||||
Service Function(String service) serviceLookup,
|
||||
stream, [
|
||||
List<Interceptor> interceptors = const <Interceptor>[],
|
||||
CodecRegistry codecRegistry,
|
||||
]) : super(serviceLookup, stream, interceptors, codecRegistry);
|
||||
}
|
||||
|
|
|
@ -16,11 +16,11 @@
|
|||
import 'dart:async';
|
||||
import 'dart:io';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:http2/transport.dart';
|
||||
import 'package:meta/meta.dart';
|
||||
|
||||
import '../shared/security.dart';
|
||||
|
||||
import 'handler.dart';
|
||||
import 'interceptor.dart';
|
||||
import 'service.dart';
|
||||
|
@ -84,13 +84,17 @@ class ServerTlsCredentials extends ServerCredentials {
|
|||
class ConnectionServer {
|
||||
final Map<String, Service> _services = {};
|
||||
final List<Interceptor> _interceptors;
|
||||
final CodecRegistry _codecRegistry;
|
||||
|
||||
final _connections = <ServerTransportConnection>[];
|
||||
|
||||
/// Create a server for the given [services].
|
||||
ConnectionServer(List<Service> services,
|
||||
[List<Interceptor> interceptors = const <Interceptor>[]])
|
||||
: _interceptors = interceptors {
|
||||
ConnectionServer(
|
||||
List<Service> services, [
|
||||
List<Interceptor> interceptors = const <Interceptor>[],
|
||||
CodecRegistry codecRegistry,
|
||||
]) : _codecRegistry = codecRegistry,
|
||||
_interceptors = interceptors {
|
||||
for (final service in services) {
|
||||
_services[service.$name] = service;
|
||||
}
|
||||
|
@ -121,7 +125,8 @@ class ConnectionServer {
|
|||
|
||||
@visibleForTesting
|
||||
ServerHandler_ serveStream_(ServerTransportStream stream) {
|
||||
return ServerHandler_(lookupService, stream, _interceptors)..handle();
|
||||
return ServerHandler_(lookupService, stream, _interceptors, _codecRegistry)
|
||||
..handle();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,9 +138,11 @@ class Server extends ConnectionServer {
|
|||
SecureServerSocket _secureServer;
|
||||
|
||||
/// Create a server for the given [services].
|
||||
Server(List<Service> services,
|
||||
[List<Interceptor> interceptors = const <Interceptor>[]])
|
||||
: super(services, interceptors);
|
||||
Server(
|
||||
List<Service> services, [
|
||||
List<Interceptor> interceptors = const <Interceptor>[],
|
||||
CodecRegistry codecRegistry,
|
||||
]) : super(services, interceptors, codecRegistry);
|
||||
|
||||
/// The port that the server is listening on, or `null` if the server is not
|
||||
/// active.
|
||||
|
@ -193,7 +200,8 @@ class Server extends ConnectionServer {
|
|||
|
||||
@visibleForTesting
|
||||
ServerHandler_ serveStream_(ServerTransportStream stream) {
|
||||
return ServerHandler_(lookupService, stream, _interceptors)..handle();
|
||||
return ServerHandler_(lookupService, stream, _interceptors, _codecRegistry)
|
||||
..handle();
|
||||
}
|
||||
|
||||
@Deprecated(
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import 'package:archive/archive.dart';
|
||||
|
||||
abstract class Codec {
|
||||
/// Returns the message encoding that this compressor uses.
|
||||
///
|
||||
/// This can be values such as "gzip", "deflate", "snappy", etc.
|
||||
String get encodingName;
|
||||
|
||||
/// Wraps an existing output stream with a compressed output.
|
||||
List<int> compress(List<int> data);
|
||||
|
||||
/// Wraps an existing output stream with a uncompressed input data.
|
||||
List<int> decompress(List<int> data);
|
||||
}
|
||||
|
||||
/// The "identity", or "none" codec.
|
||||
///
|
||||
/// This codec is special in that it can be used to explicitly disable Call
|
||||
/// compression on a Channel that by default compresses.
|
||||
class IdentityCodec implements Codec {
|
||||
const IdentityCodec();
|
||||
|
||||
@override
|
||||
String get encodingName => "identity";
|
||||
|
||||
@override
|
||||
List<int> compress(List<int> data) {
|
||||
return data;
|
||||
}
|
||||
|
||||
@override
|
||||
List<int> decompress(List<int> data) {
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
/// A gzip compressor and decompressor.
|
||||
class GzipCodec implements Codec {
|
||||
const GzipCodec();
|
||||
|
||||
@override
|
||||
String get encodingName => "gzip";
|
||||
|
||||
@override
|
||||
List<int> compress(List<int> data) {
|
||||
return GZipEncoder().encode(data);
|
||||
}
|
||||
|
||||
@override
|
||||
List<int> decompress(List<int> data) {
|
||||
return GZipDecoder().decodeBytes(data);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,50 @@
|
|||
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
|
||||
// for details. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
import 'codec.dart';
|
||||
|
||||
/// Encloses classes related to the compression and decompression of messages.
|
||||
class CodecRegistry {
|
||||
CodecRegistry({List<Codec> codecs = const [IdentityCodec()]})
|
||||
: assert(codecs != null),
|
||||
_codecs = Map.fromIterable(codecs, key: (c) => c.encodingName),
|
||||
_supportedEncodings = codecs.map((c) {
|
||||
if (c.encodingName.contains(',')) {
|
||||
throw ArgumentError.value(c.encodingName, 'codecs',
|
||||
'contains entries with names containing ","');
|
||||
}
|
||||
return c.encodingName;
|
||||
}).join(',') {
|
||||
if (_codecs.length != codecs.length) {
|
||||
throw ArgumentError.value(
|
||||
codecs, 'codecs', 'contains multiple entries with the same name');
|
||||
}
|
||||
}
|
||||
|
||||
factory CodecRegistry.empty() {
|
||||
return CodecRegistry(codecs: []);
|
||||
}
|
||||
|
||||
/// Key refers to the `encodingName` param from the [Codec].
|
||||
final Map<String, Codec> _codecs;
|
||||
|
||||
final String _supportedEncodings;
|
||||
|
||||
Codec lookup(String codecName) {
|
||||
return _codecs[codecName];
|
||||
}
|
||||
|
||||
String get supportedEncodings => _supportedEncodings;
|
||||
}
|
|
@ -16,6 +16,10 @@
|
|||
import 'dart:async';
|
||||
import 'dart:typed_data';
|
||||
|
||||
import 'codec.dart';
|
||||
import 'codec_registry.dart';
|
||||
import 'status.dart';
|
||||
|
||||
abstract class GrpcMessage {}
|
||||
|
||||
class GrpcMetadata extends GrpcMessage {
|
||||
|
@ -54,25 +58,38 @@ class GrpcMessageSink extends Sink<GrpcMessage> {
|
|||
}
|
||||
}
|
||||
|
||||
List<int> frame(List<int> payload) {
|
||||
final payloadLength = payload.length;
|
||||
List<int> frame(List<int> rawPayload, [Codec codec]) {
|
||||
final compressedPayload =
|
||||
codec == null ? rawPayload : codec.compress(rawPayload);
|
||||
final payloadLength = compressedPayload.length;
|
||||
final bytes = Uint8List(payloadLength + 5);
|
||||
final header = bytes.buffer.asByteData(0, 5);
|
||||
header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression
|
||||
header.setUint8(0, codec == null ? 0 : 1);
|
||||
header.setUint32(1, payloadLength);
|
||||
bytes.setRange(5, bytes.length, payload);
|
||||
bytes.setRange(5, bytes.length, compressedPayload);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor() =>
|
||||
StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
|
||||
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
|
||||
if (value is GrpcData) {
|
||||
if (value.isCompressed) {
|
||||
// TODO(dart-lang/grpc-dart#6): Actually handle decompression.
|
||||
sink.add(GrpcData(value.data, isCompressed: false));
|
||||
return;
|
||||
}
|
||||
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor({
|
||||
CodecRegistry codecRegistry,
|
||||
}) {
|
||||
Codec codec;
|
||||
return StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
|
||||
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
|
||||
if (value is GrpcData && value.isCompressed) {
|
||||
if (codec == null) {
|
||||
sink.addError(
|
||||
GrpcError.unimplemented('Compression mechanism not supported'),
|
||||
);
|
||||
return;
|
||||
}
|
||||
sink.add(value);
|
||||
});
|
||||
sink.add(GrpcData(codec.decompress(value.data), isCompressed: false));
|
||||
return;
|
||||
}
|
||||
|
||||
if (value is GrpcMetadata && value.metadata.containsKey('grpc-encoding')) {
|
||||
codec = codecRegistry?.lookup(value.metadata['grpc-encoding']);
|
||||
}
|
||||
sink.add(value);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
name: grpc
|
||||
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
|
||||
|
||||
version: 2.8.0
|
||||
version: 2.9.0-dev
|
||||
|
||||
homepage: https://github.com/dart-lang/grpc-dart
|
||||
|
||||
|
@ -9,6 +9,7 @@ environment:
|
|||
sdk: '>=2.8.0 <3.0.0'
|
||||
|
||||
dependencies:
|
||||
archive: ^2.0.13
|
||||
async: ^2.2.0
|
||||
crypto: ^2.1.4
|
||||
fixnum: ^0.10.11
|
||||
|
|
|
@ -63,6 +63,30 @@ void main() {
|
|||
);
|
||||
});
|
||||
|
||||
test('Unary call attaches encoding headers', () async {
|
||||
const requestValue = 17;
|
||||
const responseValue = 19;
|
||||
|
||||
void handleRequest(StreamMessage message) {
|
||||
final data = validateDataMessage(message);
|
||||
expect(mockDecode(data.data), requestValue);
|
||||
|
||||
harness
|
||||
..sendResponseHeader()
|
||||
..sendResponseValue(responseValue)
|
||||
..sendResponseTrailer();
|
||||
}
|
||||
|
||||
await harness.runTest(
|
||||
clientCall: harness.client.unary(requestValue,
|
||||
options: CallOptions(metadata: {'grpc-accept-encoding': 'gzip'})),
|
||||
expectedResult: responseValue,
|
||||
expectedCustomHeaders: {'grpc-accept-encoding': 'gzip'},
|
||||
expectedPath: '/Test/Unary',
|
||||
serverHandlers: [handleRequest],
|
||||
);
|
||||
});
|
||||
|
||||
test('Client-streaming calls work on the client', () async {
|
||||
const requests = [17, 3];
|
||||
const response = 12;
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
@TestOn('vm')
|
||||
import 'dart:async';
|
||||
import 'dart:io';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:grpc/service_api.dart' as api;
|
||||
import 'package:grpc/src/client/channel.dart' hide ClientChannel;
|
||||
import 'package:grpc/src/client/connection.dart';
|
||||
import 'package:grpc/src/client/http2_connection.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
import 'common.dart';
|
||||
|
||||
class TestClient extends Client {
|
||||
|
@ -73,6 +75,30 @@ main() async {
|
|||
server.shutdown();
|
||||
});
|
||||
|
||||
testTcpAndUds('round trip with outgoing and incoming compression',
|
||||
(address) async {
|
||||
final Server server = Server(
|
||||
[TestService()], const [], CodecRegistry(codecs: const [GzipCodec()]));
|
||||
await server.serve(address: address, port: 0);
|
||||
|
||||
final channel = FixedConnectionClientChannel(Http2ClientConnection(
|
||||
address,
|
||||
server.port,
|
||||
ChannelOptions(
|
||||
credentials: ChannelCredentials.insecure(),
|
||||
codecRegistry: CodecRegistry(codecs: const [GzipCodec()]),
|
||||
),
|
||||
));
|
||||
final testClient = TestClient(channel);
|
||||
expect(
|
||||
await testClient
|
||||
.stream(TestService.requestFiniteStream,
|
||||
options: CallOptions(compression: const GzipCodec()))
|
||||
.toList(),
|
||||
[1, 2, 3]);
|
||||
await server.shutdown();
|
||||
});
|
||||
|
||||
testTcpAndUds('round trip secure connection', (address) async {
|
||||
// round trip test of secure connection.
|
||||
final Server server = Server([TestService()]);
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
import 'package:grpc/src/shared/codec.dart';
|
||||
import 'package:grpc/src/shared/codec_registry.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
void main() {
|
||||
test('CodecRegistry register adds new encodings', () {
|
||||
final registry = CodecRegistry();
|
||||
expect(registry.supportedEncodings, 'identity');
|
||||
});
|
||||
|
||||
test('CodecRegistry lookup', () {
|
||||
final registry = CodecRegistry();
|
||||
expect(registry.lookup('identity'), const IdentityCodec());
|
||||
});
|
||||
}
|
|
@ -16,14 +16,13 @@
|
|||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:grpc/src/client/channel.dart' as base;
|
||||
import 'package:grpc/src/client/http2_connection.dart';
|
||||
import 'package:grpc/src/shared/message.dart';
|
||||
import 'package:http2/transport.dart';
|
||||
import 'package:test/test.dart';
|
||||
import 'package:mockito/mockito.dart';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
import 'utils.dart';
|
||||
|
||||
|
@ -69,6 +68,7 @@ class FakeChannelOptions implements ChannelOptions {
|
|||
Duration connectionTimeout = const Duration(seconds: 10);
|
||||
String userAgent = 'dart-grpc/1.0.0 test';
|
||||
BackoffStrategy backoffStrategy = testBackoff;
|
||||
CodecRegistry codecRegistry = CodecRegistry.empty();
|
||||
}
|
||||
|
||||
class FakeChannel extends ClientChannel {
|
||||
|
|
|
@ -15,13 +15,12 @@
|
|||
|
||||
import 'dart:async';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
import 'package:grpc/src/client/http2_connection.dart';
|
||||
import 'package:grpc/src/shared/message.dart';
|
||||
import 'package:http2/transport.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
import 'package:grpc/grpc.dart';
|
||||
|
||||
import 'utils.dart';
|
||||
|
||||
class TestService extends Service {
|
||||
|
@ -189,7 +188,7 @@ abstract class _Harness {
|
|||
Map<String, String> metadata,
|
||||
Duration timeout}) {
|
||||
final headers = Http2ClientConnection.createCallHeaders(
|
||||
true, authority, path, timeout, metadata,
|
||||
true, authority, path, timeout, metadata, null,
|
||||
userAgent: 'dart-grpc/1.0.0 test');
|
||||
toServer.add(HeadersStreamMessage(headers));
|
||||
}
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:grpc/src/shared/streams.dart';
|
||||
import 'package:grpc/src/shared/message.dart';
|
||||
import 'package:grpc/src/shared/streams.dart';
|
||||
import 'package:http2/transport.dart';
|
||||
import 'package:test/test.dart';
|
||||
|
||||
|
@ -44,7 +44,6 @@ void validateRequestHeaders(Map<String, String> headers,
|
|||
expect(headers['grpc-timeout'], timeout);
|
||||
expect(headers['content-type'], 'application/grpc');
|
||||
expect(headers['te'], 'trailers');
|
||||
expect(headers['grpc-accept-encoding'], 'identity');
|
||||
expect(headers['user-agent'], startsWith('dart-grpc/'));
|
||||
|
||||
customHeaders?.forEach((key, value) {
|
||||
|
|
Loading…
Reference in New Issue