Support message compression (#409)

* Added support for compression/decompression, which can be configured through 
  `ChannelOptions` constructor's `codecRegistry` parameter or adding the 
  `grpc-accept-encoding` to `metadata` parameter of `CallOptions` on the client 
  side and `codecRegistry` parameter to `Server` on the server side.
  Outgoing rpc can be compressed using the `compression` parameter on the 
  `CallOptions`.

Closes #6
This commit is contained in:
Ankur Jain 2020-12-11 00:52:24 -08:00 committed by GitHub
parent e51c5a3d5d
commit c48af638a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 377 additions and 84 deletions

View File

@ -1,3 +1,12 @@
## 2.9.0-dev
* Added support for compression/decompression, which can be configured through
`ChannelOptions` constructor's `codecRegistry` parameter or adding the
`grpc-accept-encoding` to `metadata` parameter of `CallOptions` on the client
side and `codecRegistry` parameter to `Server` on the server side.
Outgoing rpc can be compressed using the `compression` parameter on the
`CallOptions`.
## 2.8.0
* Added support for client interceptors, which can be configured through

View File

@ -15,7 +15,6 @@
/// Dart implementation of the gRPC helloworld.Greeter client.
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pb.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
@ -23,14 +22,21 @@ Future<void> main(List<String> args) async {
final channel = ClientChannel(
'localhost',
port: 50051,
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
options: ChannelOptions(
credentials: ChannelCredentials.insecure(),
codecRegistry:
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
),
);
final stub = GreeterClient(channel);
final name = args.isNotEmpty ? args[0] : 'world';
try {
final response = await stub.sayHello(HelloRequest()..name = name);
final response = await stub.sayHello(
HelloRequest()..name = name,
options: CallOptions(compression: const GzipCodec()),
);
print('Greeter client received: ${response.message}');
} catch (e) {
print('Caught error: $e');

View File

@ -15,7 +15,6 @@
/// Dart implementation of the gRPC helloworld.Greeter server.
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pb.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
@ -27,7 +26,11 @@ class GreeterService extends GreeterServiceBase {
}
Future<void> main(List<String> args) async {
final server = Server([GreeterService()]);
final server = Server(
[GreeterService()],
const <Interceptor>[],
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
);
await server.serve(port: 50051);
print('Server listening on port ${server.port}...');
}

View File

@ -31,10 +31,8 @@ export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
export 'src/client/connection.dart' show ConnectionState;
export 'src/client/http2_channel.dart'
show ClientChannel, ClientTransportConnectorChannel;
export 'src/client/interceptor.dart'
show ClientInterceptor, ClientUnaryInvoker, ClientStreamingInvoker;
export 'src/client/method.dart' show ClientMethod;
export 'src/client/options.dart'
show
@ -57,11 +55,11 @@ export 'src/server/server.dart'
ConnectionServer,
Server;
export 'src/server/service.dart' show ServiceMethod, Service;
export 'src/shared/codec.dart' show Codec, IdentityCodec, GzipCodec;
export 'src/shared/codec_registry.dart';
export 'src/shared/message.dart'
show GrpcMessage, GrpcMetadata, GrpcData, grpcDecompressor;
export 'src/shared/profiler.dart' show isTimelineLoggingEnabled;
export 'src/shared/security.dart'
show supportedAlpnProtocols, createSecurityContext;
export 'src/shared/status.dart' show StatusCode, GrpcError;

View File

@ -17,6 +17,7 @@ import 'dart:async';
import 'dart:convert';
import 'dart:developer';
import 'package:grpc/grpc.dart';
import 'package:grpc/src/generated/google/rpc/status.pb.dart';
import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart';
@ -34,6 +35,7 @@ const _reservedHeaders = [
'te',
'grpc-timeout',
'grpc-accept-encoding',
'grpc-encoding',
'user-agent',
];
@ -55,8 +57,14 @@ class CallOptions {
final Map<String, String> metadata;
final Duration timeout;
final List<MetadataProvider> metadataProviders;
final Codec compression;
CallOptions._(this.metadata, this.timeout, this.metadataProviders);
CallOptions._(
this.metadata,
this.timeout,
this.metadataProviders,
this.compression,
);
/// Creates a [CallOptions] object.
///
@ -64,12 +72,18 @@ class CallOptions {
/// configure per-RPC metadata [providers]. The metadata [providers] are
/// invoked in order for every RPC, and can modify the outgoing metadata
/// (including metadata provided by previous providers).
factory CallOptions(
{Map<String, String> metadata,
Duration timeout,
List<MetadataProvider> providers}) {
return CallOptions._(Map.unmodifiable(metadata ?? {}), timeout,
List.unmodifiable(providers ?? []));
factory CallOptions({
Map<String, String> metadata,
Duration timeout,
List<MetadataProvider> providers,
Codec compression,
}) {
return CallOptions._(
Map.unmodifiable(metadata ?? {}),
timeout,
List.unmodifiable(providers ?? []),
compression,
);
}
factory CallOptions.from(Iterable<CallOptions> options) =>
@ -81,8 +95,13 @@ class CallOptions {
final mergedTimeout = other.timeout ?? timeout;
final mergedProviders = List.from(metadataProviders)
..addAll(other.metadataProviders);
return CallOptions._(Map.unmodifiable(mergedMetadata), mergedTimeout,
List.unmodifiable(mergedProviders));
final mergedCompression = other.compression ?? compression;
return CallOptions._(
Map.unmodifiable(mergedMetadata),
mergedTimeout,
List.unmodifiable(mergedProviders),
mergedCompression,
);
}
}
@ -109,7 +128,7 @@ class WebCallOptions extends CallOptions {
WebCallOptions._(Map<String, String> metadata, Duration timeout,
List<MetadataProvider> metadataProviders,
{this.bypassCorsPreflight, this.withCredentials})
: super._(metadata, timeout, metadataProviders);
: super._(metadata, timeout, metadataProviders, null);
/// Creates a [WebCallOptions] object.
///
@ -246,7 +265,12 @@ class ClientCall<Q, R> implements Response {
void _sendRequest(ClientConnection connection, Map<String, String> metadata) {
try {
_stream = connection.makeRequest(
_method.path, options.timeout, metadata, _onRequestError);
_method.path,
options.timeout,
metadata,
_onRequestError,
callOptions: options,
);
} catch (e) {
_terminateWithError(GrpcError.unavailable('Error making call: $e'));
return;

View File

@ -20,13 +20,12 @@ import 'dart:io';
import 'package:http2/transport.dart';
import 'package:meta/meta.dart';
import '../shared/codec.dart';
import '../shared/timeout.dart';
import 'call.dart';
import 'client_transport_connector.dart';
import 'connection.dart' hide ClientConnection;
import 'connection.dart' as connection;
import 'options.dart';
import 'transport/http2_credentials.dart';
import 'transport/http2_transport.dart';
@ -39,8 +38,6 @@ class Http2ClientConnection implements connection.ClientConnection {
static final _contentTypeGrpc =
Header.ascii('content-type', 'application/grpc');
static final _teTrailers = Header.ascii('te', 'trailers');
static final _grpcAcceptEncoding =
Header.ascii('grpc-accept-encoding', 'identity');
final ChannelOptions options;
@ -155,11 +152,25 @@ class Http2ClientConnection implements connection.ClientConnection {
GrpcTransportStream makeRequest(String path, Duration timeout,
Map<String, String> metadata, ErrorHandler onRequestFailure,
{CallOptions callOptions}) {
final headers = createCallHeaders(credentials.isSecure,
_transportConnector.authority, path, timeout, metadata,
userAgent: options.userAgent);
final compressionCodec = callOptions.compression;
final headers = createCallHeaders(
credentials.isSecure,
_transportConnector.authority,
path,
timeout,
metadata,
compressionCodec,
userAgent: options.userAgent,
grpcAcceptEncodings: callOptions.metadata['grpc-accept-encoding'] ??
options.codecRegistry?.supportedEncodings,
);
final stream = _transportConnection.makeRequest(headers);
return Http2TransportStream(stream, onRequestFailure);
return Http2TransportStream(
stream,
onRequestFailure,
options.codecRegistry,
compressionCodec,
);
}
void _startCall(ClientCall call) {
@ -272,24 +283,31 @@ class Http2ClientConnection implements connection.ClientConnection {
_timer = Timer(_currentReconnectDelay, _handleReconnect);
}
static List<Header> createCallHeaders(bool useTls, String authority,
String path, Duration timeout, Map<String, String> metadata,
{String userAgent}) {
static List<Header> createCallHeaders(
bool useTls,
String authority,
String path,
Duration timeout,
Map<String, String> metadata,
Codec compressionCodec, {
String userAgent,
String grpcAcceptEncodings,
}) {
final headers = [
_methodPost,
useTls ? _schemeHttps : _schemeHttp,
Header(ascii.encode(':path'), utf8.encode(path)),
Header(ascii.encode(':authority'), utf8.encode(authority)),
];
if (timeout != null) {
headers.add(Header.ascii('grpc-timeout', toTimeoutString(timeout)));
}
headers.addAll([
if (timeout != null)
Header.ascii('grpc-timeout', toTimeoutString(timeout)),
_contentTypeGrpc,
_teTrailers,
_grpcAcceptEncoding,
Header.ascii('user-agent', userAgent ?? defaultUserAgent),
]);
if (grpcAcceptEncodings != null)
Header.ascii('grpc-accept-encoding', grpcAcceptEncodings),
if (compressionCodec != null)
Header.ascii('grpc-encoding', compressionCodec.encodingName)
];
metadata?.forEach((key, value) {
headers.add(Header(ascii.encode(key), utf8.encode(value)));
});

View File

@ -14,6 +14,8 @@
// limitations under the License.
import 'dart:math';
import '../shared/codec_registry.dart';
import 'transport/http2_credentials.dart';
const defaultIdleTimeout = Duration(minutes: 5);
@ -44,6 +46,7 @@ Duration defaultBackoffStrategy(Duration lastBackoff) {
class ChannelOptions {
final ChannelCredentials credentials;
final Duration idleTimeout;
final CodecRegistry codecRegistry;
/// The maximum time a single connection will be used for new requests.
final Duration connectionTimeout;
@ -56,5 +59,6 @@ class ChannelOptions {
this.userAgent = defaultUserAgent,
this.backoffStrategy = defaultBackoffStrategy,
this.connectionTimeout = defaultConnectionTimeOut,
this.codecRegistry,
});
}

View File

@ -15,11 +15,11 @@
import 'dart:async';
import 'package:grpc/grpc.dart';
import 'package:http2/transport.dart';
import '../../shared/message.dart';
import '../../shared/streams.dart';
import 'transport.dart';
class Http2TransportStream extends GrpcTransportStream {
@ -30,12 +30,16 @@ class Http2TransportStream extends GrpcTransportStream {
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
Http2TransportStream(this._transportStream, this._onError)
: incomingMessages = _transportStream.incomingMessages
Http2TransportStream(
this._transportStream,
this._onError,
CodecRegistry codecRegistry,
Codec compression,
) : incomingMessages = _transportStream.incomingMessages
.transform(GrpcHttpDecoder())
.transform(grpcDecompressor()) {
.transform(grpcDecompressor(codecRegistry: codecRegistry)) {
_outgoingMessages.stream
.map(frame)
.map((payload) => frame(payload, compression))
.map<StreamMessage>((bytes) => DataStreamMessage(bytes))
.handleError(_onError)
.listen(_transportStream.outgoingMessages.add,

View File

@ -18,11 +18,12 @@ import 'dart:convert';
import 'package:http2/transport.dart';
import '../shared/codec.dart';
import '../shared/codec_registry.dart';
import '../shared/message.dart';
import '../shared/status.dart';
import '../shared/streams.dart';
import '../shared/timeout.dart';
import 'call.dart';
import 'interceptor.dart';
import 'service.dart';
@ -32,6 +33,7 @@ class ServerHandler_ extends ServiceCall {
final ServerTransportStream _stream;
final Service Function(String service) _serviceLookup;
final List<Interceptor> _interceptors;
final CodecRegistry _codecRegistry;
StreamSubscription<GrpcMessage> _incomingSubscription;
@ -39,6 +41,7 @@ class ServerHandler_ extends ServiceCall {
ServiceMethod _descriptor;
Map<String, String> _clientMetadata;
Codec _callEncodingCodec;
StreamController _requests;
bool _hasReceivedRequest = false;
@ -55,7 +58,12 @@ class ServerHandler_ extends ServiceCall {
bool _isTimedOut = false;
Timer _timeoutTimer;
ServerHandler_(this._serviceLookup, this._stream, this._interceptors);
ServerHandler_(
this._serviceLookup,
this._stream,
this._interceptors,
this._codecRegistry,
);
DateTime get deadline => _deadline;
@ -74,7 +82,7 @@ class ServerHandler_ extends ServiceCall {
_incomingSubscription = _stream.incomingMessages
.transform(GrpcHttpDecoder())
.transform(grpcDecompressor())
.transform(grpcDecompressor(codecRegistry: _codecRegistry))
.listen(_onDataIdle,
onError: _onError, onDone: _onDoneError, cancelOnError: true);
_stream.outgoingMessages.done.then((_) {
@ -112,6 +120,13 @@ class ServerHandler_ extends ServiceCall {
}
final serviceName = pathSegments[1];
final methodName = pathSegments[2];
if (_codecRegistry != null) {
final acceptedEncodings =
clientMetadata['grpc-accept-encoding']?.split(',') ?? [];
_callEncodingCodec = acceptedEncodings
.map(_codecRegistry.lookup)
.firstWhere((c) => c != null, orElse: () => null);
}
_service = _serviceLookup(serviceName);
_descriptor = _service?.$lookupMethod(methodName);
@ -251,7 +266,7 @@ class ServerHandler_ extends ServiceCall {
if (!_headersSent) {
sendHeaders();
}
_stream.sendData(frame(bytes));
_stream.sendData(frame(bytes, _callEncodingCodec));
} catch (error) {
final grpcError = GrpcError.internal('Error sending response: $error');
if (!_requests.isClosed) {
@ -285,7 +300,9 @@ class ServerHandler_ extends ServiceCall {
// TODO(jakobr): Should come from package:http2?
final outgoingHeadersMap = <String, String>{
':status': '200',
'content-type': 'application/grpc'
'content-type': 'application/grpc',
if (_callEncodingCodec != null)
'grpc-encoding': _callEncodingCodec.encodingName,
};
outgoingHeadersMap.addAll(_customHeaders);
@ -384,7 +401,10 @@ class ServerHandler_ extends ServiceCall {
}
class ServerHandler extends ServerHandler_ {
ServerHandler(Service Function(String service) serviceLookup, stream,
[List<Interceptor> interceptors = const <Interceptor>[]])
: super(serviceLookup, stream, interceptors);
ServerHandler(
Service Function(String service) serviceLookup,
stream, [
List<Interceptor> interceptors = const <Interceptor>[],
CodecRegistry codecRegistry,
]) : super(serviceLookup, stream, interceptors, codecRegistry);
}

View File

@ -16,11 +16,11 @@
import 'dart:async';
import 'dart:io';
import 'package:grpc/grpc.dart';
import 'package:http2/transport.dart';
import 'package:meta/meta.dart';
import '../shared/security.dart';
import 'handler.dart';
import 'interceptor.dart';
import 'service.dart';
@ -84,13 +84,17 @@ class ServerTlsCredentials extends ServerCredentials {
class ConnectionServer {
final Map<String, Service> _services = {};
final List<Interceptor> _interceptors;
final CodecRegistry _codecRegistry;
final _connections = <ServerTransportConnection>[];
/// Create a server for the given [services].
ConnectionServer(List<Service> services,
[List<Interceptor> interceptors = const <Interceptor>[]])
: _interceptors = interceptors {
ConnectionServer(
List<Service> services, [
List<Interceptor> interceptors = const <Interceptor>[],
CodecRegistry codecRegistry,
]) : _codecRegistry = codecRegistry,
_interceptors = interceptors {
for (final service in services) {
_services[service.$name] = service;
}
@ -121,7 +125,8 @@ class ConnectionServer {
@visibleForTesting
ServerHandler_ serveStream_(ServerTransportStream stream) {
return ServerHandler_(lookupService, stream, _interceptors)..handle();
return ServerHandler_(lookupService, stream, _interceptors, _codecRegistry)
..handle();
}
}
@ -133,9 +138,11 @@ class Server extends ConnectionServer {
SecureServerSocket _secureServer;
/// Create a server for the given [services].
Server(List<Service> services,
[List<Interceptor> interceptors = const <Interceptor>[]])
: super(services, interceptors);
Server(
List<Service> services, [
List<Interceptor> interceptors = const <Interceptor>[],
CodecRegistry codecRegistry,
]) : super(services, interceptors, codecRegistry);
/// The port that the server is listening on, or `null` if the server is not
/// active.
@ -193,7 +200,8 @@ class Server extends ConnectionServer {
@visibleForTesting
ServerHandler_ serveStream_(ServerTransportStream stream) {
return ServerHandler_(lookupService, stream, _interceptors)..handle();
return ServerHandler_(lookupService, stream, _interceptors, _codecRegistry)
..handle();
}
@Deprecated(

68
lib/src/shared/codec.dart Normal file
View File

@ -0,0 +1,68 @@
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'package:archive/archive.dart';
abstract class Codec {
/// Returns the message encoding that this compressor uses.
///
/// This can be values such as "gzip", "deflate", "snappy", etc.
String get encodingName;
/// Wraps an existing output stream with a compressed output.
List<int> compress(List<int> data);
/// Wraps an existing output stream with a uncompressed input data.
List<int> decompress(List<int> data);
}
/// The "identity", or "none" codec.
///
/// This codec is special in that it can be used to explicitly disable Call
/// compression on a Channel that by default compresses.
class IdentityCodec implements Codec {
const IdentityCodec();
@override
String get encodingName => "identity";
@override
List<int> compress(List<int> data) {
return data;
}
@override
List<int> decompress(List<int> data) {
return data;
}
}
/// A gzip compressor and decompressor.
class GzipCodec implements Codec {
const GzipCodec();
@override
String get encodingName => "gzip";
@override
List<int> compress(List<int> data) {
return GZipEncoder().encode(data);
}
@override
List<int> decompress(List<int> data) {
return GZipDecoder().decodeBytes(data);
}
}

View File

@ -0,0 +1,50 @@
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'codec.dart';
/// Encloses classes related to the compression and decompression of messages.
class CodecRegistry {
CodecRegistry({List<Codec> codecs = const [IdentityCodec()]})
: assert(codecs != null),
_codecs = Map.fromIterable(codecs, key: (c) => c.encodingName),
_supportedEncodings = codecs.map((c) {
if (c.encodingName.contains(',')) {
throw ArgumentError.value(c.encodingName, 'codecs',
'contains entries with names containing ","');
}
return c.encodingName;
}).join(',') {
if (_codecs.length != codecs.length) {
throw ArgumentError.value(
codecs, 'codecs', 'contains multiple entries with the same name');
}
}
factory CodecRegistry.empty() {
return CodecRegistry(codecs: []);
}
/// Key refers to the `encodingName` param from the [Codec].
final Map<String, Codec> _codecs;
final String _supportedEncodings;
Codec lookup(String codecName) {
return _codecs[codecName];
}
String get supportedEncodings => _supportedEncodings;
}

View File

@ -16,6 +16,10 @@
import 'dart:async';
import 'dart:typed_data';
import 'codec.dart';
import 'codec_registry.dart';
import 'status.dart';
abstract class GrpcMessage {}
class GrpcMetadata extends GrpcMessage {
@ -54,25 +58,38 @@ class GrpcMessageSink extends Sink<GrpcMessage> {
}
}
List<int> frame(List<int> payload) {
final payloadLength = payload.length;
List<int> frame(List<int> rawPayload, [Codec codec]) {
final compressedPayload =
codec == null ? rawPayload : codec.compress(rawPayload);
final payloadLength = compressedPayload.length;
final bytes = Uint8List(payloadLength + 5);
final header = bytes.buffer.asByteData(0, 5);
header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression
header.setUint8(0, codec == null ? 0 : 1);
header.setUint32(1, payloadLength);
bytes.setRange(5, bytes.length, payload);
bytes.setRange(5, bytes.length, compressedPayload);
return bytes;
}
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor() =>
StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
if (value is GrpcData) {
if (value.isCompressed) {
// TODO(dart-lang/grpc-dart#6): Actually handle decompression.
sink.add(GrpcData(value.data, isCompressed: false));
return;
}
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor({
CodecRegistry codecRegistry,
}) {
Codec codec;
return StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
if (value is GrpcData && value.isCompressed) {
if (codec == null) {
sink.addError(
GrpcError.unimplemented('Compression mechanism not supported'),
);
return;
}
sink.add(value);
});
sink.add(GrpcData(codec.decompress(value.data), isCompressed: false));
return;
}
if (value is GrpcMetadata && value.metadata.containsKey('grpc-encoding')) {
codec = codecRegistry?.lookup(value.metadata['grpc-encoding']);
}
sink.add(value);
});
}

View File

@ -1,7 +1,7 @@
name: grpc
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
version: 2.8.0
version: 2.9.0-dev
homepage: https://github.com/dart-lang/grpc-dart
@ -9,6 +9,7 @@ environment:
sdk: '>=2.8.0 <3.0.0'
dependencies:
archive: ^2.0.13
async: ^2.2.0
crypto: ^2.1.4
fixnum: ^0.10.11

View File

@ -63,6 +63,30 @@ void main() {
);
});
test('Unary call attaches encoding headers', () async {
const requestValue = 17;
const responseValue = 19;
void handleRequest(StreamMessage message) {
final data = validateDataMessage(message);
expect(mockDecode(data.data), requestValue);
harness
..sendResponseHeader()
..sendResponseValue(responseValue)
..sendResponseTrailer();
}
await harness.runTest(
clientCall: harness.client.unary(requestValue,
options: CallOptions(metadata: {'grpc-accept-encoding': 'gzip'})),
expectedResult: responseValue,
expectedCustomHeaders: {'grpc-accept-encoding': 'gzip'},
expectedPath: '/Test/Unary',
serverHandlers: [handleRequest],
);
});
test('Client-streaming calls work on the client', () async {
const requests = [17, 3];
const response = 12;

View File

@ -1,12 +1,14 @@
@TestOn('vm')
import 'dart:async';
import 'dart:io';
import 'package:grpc/grpc.dart';
import 'package:grpc/service_api.dart' as api;
import 'package:grpc/src/client/channel.dart' hide ClientChannel;
import 'package:grpc/src/client/connection.dart';
import 'package:grpc/src/client/http2_connection.dart';
import 'package:test/test.dart';
import 'common.dart';
class TestClient extends Client {
@ -73,6 +75,30 @@ main() async {
server.shutdown();
});
testTcpAndUds('round trip with outgoing and incoming compression',
(address) async {
final Server server = Server(
[TestService()], const [], CodecRegistry(codecs: const [GzipCodec()]));
await server.serve(address: address, port: 0);
final channel = FixedConnectionClientChannel(Http2ClientConnection(
address,
server.port,
ChannelOptions(
credentials: ChannelCredentials.insecure(),
codecRegistry: CodecRegistry(codecs: const [GzipCodec()]),
),
));
final testClient = TestClient(channel);
expect(
await testClient
.stream(TestService.requestFiniteStream,
options: CallOptions(compression: const GzipCodec()))
.toList(),
[1, 2, 3]);
await server.shutdown();
});
testTcpAndUds('round trip secure connection', (address) async {
// round trip test of secure connection.
final Server server = Server([TestService()]);

View File

@ -0,0 +1,15 @@
import 'package:grpc/src/shared/codec.dart';
import 'package:grpc/src/shared/codec_registry.dart';
import 'package:test/test.dart';
void main() {
test('CodecRegistry register adds new encodings', () {
final registry = CodecRegistry();
expect(registry.supportedEncodings, 'identity');
});
test('CodecRegistry lookup', () {
final registry = CodecRegistry();
expect(registry.lookup('identity'), const IdentityCodec());
});
}

View File

@ -16,14 +16,13 @@
import 'dart:async';
import 'dart:convert';
import 'package:grpc/grpc.dart';
import 'package:grpc/src/client/channel.dart' as base;
import 'package:grpc/src/client/http2_connection.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
import 'package:mockito/mockito.dart';
import 'package:grpc/grpc.dart';
import 'package:test/test.dart';
import 'utils.dart';
@ -69,6 +68,7 @@ class FakeChannelOptions implements ChannelOptions {
Duration connectionTimeout = const Duration(seconds: 10);
String userAgent = 'dart-grpc/1.0.0 test';
BackoffStrategy backoffStrategy = testBackoff;
CodecRegistry codecRegistry = CodecRegistry.empty();
}
class FakeChannel extends ClientChannel {

View File

@ -15,13 +15,12 @@
import 'dart:async';
import 'package:grpc/grpc.dart';
import 'package:grpc/src/client/http2_connection.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
import 'package:grpc/grpc.dart';
import 'utils.dart';
class TestService extends Service {
@ -189,7 +188,7 @@ abstract class _Harness {
Map<String, String> metadata,
Duration timeout}) {
final headers = Http2ClientConnection.createCallHeaders(
true, authority, path, timeout, metadata,
true, authority, path, timeout, metadata, null,
userAgent: 'dart-grpc/1.0.0 test');
toServer.add(HeadersStreamMessage(headers));
}

View File

@ -15,8 +15,8 @@
import 'dart:convert';
import 'package:grpc/src/shared/streams.dart';
import 'package:grpc/src/shared/message.dart';
import 'package:grpc/src/shared/streams.dart';
import 'package:http2/transport.dart';
import 'package:test/test.dart';
@ -44,7 +44,6 @@ void validateRequestHeaders(Map<String, String> headers,
expect(headers['grpc-timeout'], timeout);
expect(headers['content-type'], 'application/grpc');
expect(headers['te'], 'trailers');
expect(headers['grpc-accept-encoding'], 'identity');
expect(headers['user-agent'], startsWith('dart-grpc/'));
customHeaders?.forEach((key, value) {