mirror of https://github.com/grpc/grpc-dart.git
Support message compression (#409)
* Added support for compression/decompression, which can be configured through `ChannelOptions` constructor's `codecRegistry` parameter or adding the `grpc-accept-encoding` to `metadata` parameter of `CallOptions` on the client side and `codecRegistry` parameter to `Server` on the server side. Outgoing rpc can be compressed using the `compression` parameter on the `CallOptions`. Closes #6
This commit is contained in:
parent
e51c5a3d5d
commit
c48af638a5
|
@ -1,3 +1,12 @@
|
||||||
|
## 2.9.0-dev
|
||||||
|
|
||||||
|
* Added support for compression/decompression, which can be configured through
|
||||||
|
`ChannelOptions` constructor's `codecRegistry` parameter or adding the
|
||||||
|
`grpc-accept-encoding` to `metadata` parameter of `CallOptions` on the client
|
||||||
|
side and `codecRegistry` parameter to `Server` on the server side.
|
||||||
|
Outgoing rpc can be compressed using the `compression` parameter on the
|
||||||
|
`CallOptions`.
|
||||||
|
|
||||||
## 2.8.0
|
## 2.8.0
|
||||||
|
|
||||||
* Added support for client interceptors, which can be configured through
|
* Added support for client interceptors, which can be configured through
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
|
|
||||||
/// Dart implementation of the gRPC helloworld.Greeter client.
|
/// Dart implementation of the gRPC helloworld.Greeter client.
|
||||||
import 'package:grpc/grpc.dart';
|
import 'package:grpc/grpc.dart';
|
||||||
|
|
||||||
import 'package:helloworld/src/generated/helloworld.pb.dart';
|
import 'package:helloworld/src/generated/helloworld.pb.dart';
|
||||||
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
|
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
|
||||||
|
|
||||||
|
@ -23,14 +22,21 @@ Future<void> main(List<String> args) async {
|
||||||
final channel = ClientChannel(
|
final channel = ClientChannel(
|
||||||
'localhost',
|
'localhost',
|
||||||
port: 50051,
|
port: 50051,
|
||||||
options: const ChannelOptions(credentials: ChannelCredentials.insecure()),
|
options: ChannelOptions(
|
||||||
|
credentials: ChannelCredentials.insecure(),
|
||||||
|
codecRegistry:
|
||||||
|
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
|
||||||
|
),
|
||||||
);
|
);
|
||||||
final stub = GreeterClient(channel);
|
final stub = GreeterClient(channel);
|
||||||
|
|
||||||
final name = args.isNotEmpty ? args[0] : 'world';
|
final name = args.isNotEmpty ? args[0] : 'world';
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final response = await stub.sayHello(HelloRequest()..name = name);
|
final response = await stub.sayHello(
|
||||||
|
HelloRequest()..name = name,
|
||||||
|
options: CallOptions(compression: const GzipCodec()),
|
||||||
|
);
|
||||||
print('Greeter client received: ${response.message}');
|
print('Greeter client received: ${response.message}');
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
print('Caught error: $e');
|
print('Caught error: $e');
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
|
|
||||||
/// Dart implementation of the gRPC helloworld.Greeter server.
|
/// Dart implementation of the gRPC helloworld.Greeter server.
|
||||||
import 'package:grpc/grpc.dart';
|
import 'package:grpc/grpc.dart';
|
||||||
|
|
||||||
import 'package:helloworld/src/generated/helloworld.pb.dart';
|
import 'package:helloworld/src/generated/helloworld.pb.dart';
|
||||||
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
|
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
|
||||||
|
|
||||||
|
@ -27,7 +26,11 @@ class GreeterService extends GreeterServiceBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
Future<void> main(List<String> args) async {
|
Future<void> main(List<String> args) async {
|
||||||
final server = Server([GreeterService()]);
|
final server = Server(
|
||||||
|
[GreeterService()],
|
||||||
|
const <Interceptor>[],
|
||||||
|
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
|
||||||
|
);
|
||||||
await server.serve(port: 50051);
|
await server.serve(port: 50051);
|
||||||
print('Server listening on port ${server.port}...');
|
print('Server listening on port ${server.port}...');
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,10 +31,8 @@ export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture;
|
||||||
export 'src/client/connection.dart' show ConnectionState;
|
export 'src/client/connection.dart' show ConnectionState;
|
||||||
export 'src/client/http2_channel.dart'
|
export 'src/client/http2_channel.dart'
|
||||||
show ClientChannel, ClientTransportConnectorChannel;
|
show ClientChannel, ClientTransportConnectorChannel;
|
||||||
|
|
||||||
export 'src/client/interceptor.dart'
|
export 'src/client/interceptor.dart'
|
||||||
show ClientInterceptor, ClientUnaryInvoker, ClientStreamingInvoker;
|
show ClientInterceptor, ClientUnaryInvoker, ClientStreamingInvoker;
|
||||||
|
|
||||||
export 'src/client/method.dart' show ClientMethod;
|
export 'src/client/method.dart' show ClientMethod;
|
||||||
export 'src/client/options.dart'
|
export 'src/client/options.dart'
|
||||||
show
|
show
|
||||||
|
@ -57,11 +55,11 @@ export 'src/server/server.dart'
|
||||||
ConnectionServer,
|
ConnectionServer,
|
||||||
Server;
|
Server;
|
||||||
export 'src/server/service.dart' show ServiceMethod, Service;
|
export 'src/server/service.dart' show ServiceMethod, Service;
|
||||||
|
export 'src/shared/codec.dart' show Codec, IdentityCodec, GzipCodec;
|
||||||
|
export 'src/shared/codec_registry.dart';
|
||||||
export 'src/shared/message.dart'
|
export 'src/shared/message.dart'
|
||||||
show GrpcMessage, GrpcMetadata, GrpcData, grpcDecompressor;
|
show GrpcMessage, GrpcMetadata, GrpcData, grpcDecompressor;
|
||||||
|
|
||||||
export 'src/shared/profiler.dart' show isTimelineLoggingEnabled;
|
export 'src/shared/profiler.dart' show isTimelineLoggingEnabled;
|
||||||
|
|
||||||
export 'src/shared/security.dart'
|
export 'src/shared/security.dart'
|
||||||
show supportedAlpnProtocols, createSecurityContext;
|
show supportedAlpnProtocols, createSecurityContext;
|
||||||
export 'src/shared/status.dart' show StatusCode, GrpcError;
|
export 'src/shared/status.dart' show StatusCode, GrpcError;
|
||||||
|
|
|
@ -17,6 +17,7 @@ import 'dart:async';
|
||||||
import 'dart:convert';
|
import 'dart:convert';
|
||||||
import 'dart:developer';
|
import 'dart:developer';
|
||||||
|
|
||||||
|
import 'package:grpc/grpc.dart';
|
||||||
import 'package:grpc/src/generated/google/rpc/status.pb.dart';
|
import 'package:grpc/src/generated/google/rpc/status.pb.dart';
|
||||||
import 'package:meta/meta.dart';
|
import 'package:meta/meta.dart';
|
||||||
import 'package:protobuf/protobuf.dart';
|
import 'package:protobuf/protobuf.dart';
|
||||||
|
@ -34,6 +35,7 @@ const _reservedHeaders = [
|
||||||
'te',
|
'te',
|
||||||
'grpc-timeout',
|
'grpc-timeout',
|
||||||
'grpc-accept-encoding',
|
'grpc-accept-encoding',
|
||||||
|
'grpc-encoding',
|
||||||
'user-agent',
|
'user-agent',
|
||||||
];
|
];
|
||||||
|
|
||||||
|
@ -55,8 +57,14 @@ class CallOptions {
|
||||||
final Map<String, String> metadata;
|
final Map<String, String> metadata;
|
||||||
final Duration timeout;
|
final Duration timeout;
|
||||||
final List<MetadataProvider> metadataProviders;
|
final List<MetadataProvider> metadataProviders;
|
||||||
|
final Codec compression;
|
||||||
|
|
||||||
CallOptions._(this.metadata, this.timeout, this.metadataProviders);
|
CallOptions._(
|
||||||
|
this.metadata,
|
||||||
|
this.timeout,
|
||||||
|
this.metadataProviders,
|
||||||
|
this.compression,
|
||||||
|
);
|
||||||
|
|
||||||
/// Creates a [CallOptions] object.
|
/// Creates a [CallOptions] object.
|
||||||
///
|
///
|
||||||
|
@ -64,12 +72,18 @@ class CallOptions {
|
||||||
/// configure per-RPC metadata [providers]. The metadata [providers] are
|
/// configure per-RPC metadata [providers]. The metadata [providers] are
|
||||||
/// invoked in order for every RPC, and can modify the outgoing metadata
|
/// invoked in order for every RPC, and can modify the outgoing metadata
|
||||||
/// (including metadata provided by previous providers).
|
/// (including metadata provided by previous providers).
|
||||||
factory CallOptions(
|
factory CallOptions({
|
||||||
{Map<String, String> metadata,
|
Map<String, String> metadata,
|
||||||
Duration timeout,
|
Duration timeout,
|
||||||
List<MetadataProvider> providers}) {
|
List<MetadataProvider> providers,
|
||||||
return CallOptions._(Map.unmodifiable(metadata ?? {}), timeout,
|
Codec compression,
|
||||||
List.unmodifiable(providers ?? []));
|
}) {
|
||||||
|
return CallOptions._(
|
||||||
|
Map.unmodifiable(metadata ?? {}),
|
||||||
|
timeout,
|
||||||
|
List.unmodifiable(providers ?? []),
|
||||||
|
compression,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
factory CallOptions.from(Iterable<CallOptions> options) =>
|
factory CallOptions.from(Iterable<CallOptions> options) =>
|
||||||
|
@ -81,8 +95,13 @@ class CallOptions {
|
||||||
final mergedTimeout = other.timeout ?? timeout;
|
final mergedTimeout = other.timeout ?? timeout;
|
||||||
final mergedProviders = List.from(metadataProviders)
|
final mergedProviders = List.from(metadataProviders)
|
||||||
..addAll(other.metadataProviders);
|
..addAll(other.metadataProviders);
|
||||||
return CallOptions._(Map.unmodifiable(mergedMetadata), mergedTimeout,
|
final mergedCompression = other.compression ?? compression;
|
||||||
List.unmodifiable(mergedProviders));
|
return CallOptions._(
|
||||||
|
Map.unmodifiable(mergedMetadata),
|
||||||
|
mergedTimeout,
|
||||||
|
List.unmodifiable(mergedProviders),
|
||||||
|
mergedCompression,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,7 +128,7 @@ class WebCallOptions extends CallOptions {
|
||||||
WebCallOptions._(Map<String, String> metadata, Duration timeout,
|
WebCallOptions._(Map<String, String> metadata, Duration timeout,
|
||||||
List<MetadataProvider> metadataProviders,
|
List<MetadataProvider> metadataProviders,
|
||||||
{this.bypassCorsPreflight, this.withCredentials})
|
{this.bypassCorsPreflight, this.withCredentials})
|
||||||
: super._(metadata, timeout, metadataProviders);
|
: super._(metadata, timeout, metadataProviders, null);
|
||||||
|
|
||||||
/// Creates a [WebCallOptions] object.
|
/// Creates a [WebCallOptions] object.
|
||||||
///
|
///
|
||||||
|
@ -246,7 +265,12 @@ class ClientCall<Q, R> implements Response {
|
||||||
void _sendRequest(ClientConnection connection, Map<String, String> metadata) {
|
void _sendRequest(ClientConnection connection, Map<String, String> metadata) {
|
||||||
try {
|
try {
|
||||||
_stream = connection.makeRequest(
|
_stream = connection.makeRequest(
|
||||||
_method.path, options.timeout, metadata, _onRequestError);
|
_method.path,
|
||||||
|
options.timeout,
|
||||||
|
metadata,
|
||||||
|
_onRequestError,
|
||||||
|
callOptions: options,
|
||||||
|
);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
_terminateWithError(GrpcError.unavailable('Error making call: $e'));
|
_terminateWithError(GrpcError.unavailable('Error making call: $e'));
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -20,13 +20,12 @@ import 'dart:io';
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
import 'package:meta/meta.dart';
|
import 'package:meta/meta.dart';
|
||||||
|
|
||||||
|
import '../shared/codec.dart';
|
||||||
import '../shared/timeout.dart';
|
import '../shared/timeout.dart';
|
||||||
|
|
||||||
import 'call.dart';
|
import 'call.dart';
|
||||||
import 'client_transport_connector.dart';
|
import 'client_transport_connector.dart';
|
||||||
import 'connection.dart' hide ClientConnection;
|
import 'connection.dart' hide ClientConnection;
|
||||||
import 'connection.dart' as connection;
|
import 'connection.dart' as connection;
|
||||||
|
|
||||||
import 'options.dart';
|
import 'options.dart';
|
||||||
import 'transport/http2_credentials.dart';
|
import 'transport/http2_credentials.dart';
|
||||||
import 'transport/http2_transport.dart';
|
import 'transport/http2_transport.dart';
|
||||||
|
@ -39,8 +38,6 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
static final _contentTypeGrpc =
|
static final _contentTypeGrpc =
|
||||||
Header.ascii('content-type', 'application/grpc');
|
Header.ascii('content-type', 'application/grpc');
|
||||||
static final _teTrailers = Header.ascii('te', 'trailers');
|
static final _teTrailers = Header.ascii('te', 'trailers');
|
||||||
static final _grpcAcceptEncoding =
|
|
||||||
Header.ascii('grpc-accept-encoding', 'identity');
|
|
||||||
|
|
||||||
final ChannelOptions options;
|
final ChannelOptions options;
|
||||||
|
|
||||||
|
@ -155,11 +152,25 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
GrpcTransportStream makeRequest(String path, Duration timeout,
|
GrpcTransportStream makeRequest(String path, Duration timeout,
|
||||||
Map<String, String> metadata, ErrorHandler onRequestFailure,
|
Map<String, String> metadata, ErrorHandler onRequestFailure,
|
||||||
{CallOptions callOptions}) {
|
{CallOptions callOptions}) {
|
||||||
final headers = createCallHeaders(credentials.isSecure,
|
final compressionCodec = callOptions.compression;
|
||||||
_transportConnector.authority, path, timeout, metadata,
|
final headers = createCallHeaders(
|
||||||
userAgent: options.userAgent);
|
credentials.isSecure,
|
||||||
|
_transportConnector.authority,
|
||||||
|
path,
|
||||||
|
timeout,
|
||||||
|
metadata,
|
||||||
|
compressionCodec,
|
||||||
|
userAgent: options.userAgent,
|
||||||
|
grpcAcceptEncodings: callOptions.metadata['grpc-accept-encoding'] ??
|
||||||
|
options.codecRegistry?.supportedEncodings,
|
||||||
|
);
|
||||||
final stream = _transportConnection.makeRequest(headers);
|
final stream = _transportConnection.makeRequest(headers);
|
||||||
return Http2TransportStream(stream, onRequestFailure);
|
return Http2TransportStream(
|
||||||
|
stream,
|
||||||
|
onRequestFailure,
|
||||||
|
options.codecRegistry,
|
||||||
|
compressionCodec,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
void _startCall(ClientCall call) {
|
void _startCall(ClientCall call) {
|
||||||
|
@ -272,24 +283,31 @@ class Http2ClientConnection implements connection.ClientConnection {
|
||||||
_timer = Timer(_currentReconnectDelay, _handleReconnect);
|
_timer = Timer(_currentReconnectDelay, _handleReconnect);
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<Header> createCallHeaders(bool useTls, String authority,
|
static List<Header> createCallHeaders(
|
||||||
String path, Duration timeout, Map<String, String> metadata,
|
bool useTls,
|
||||||
{String userAgent}) {
|
String authority,
|
||||||
|
String path,
|
||||||
|
Duration timeout,
|
||||||
|
Map<String, String> metadata,
|
||||||
|
Codec compressionCodec, {
|
||||||
|
String userAgent,
|
||||||
|
String grpcAcceptEncodings,
|
||||||
|
}) {
|
||||||
final headers = [
|
final headers = [
|
||||||
_methodPost,
|
_methodPost,
|
||||||
useTls ? _schemeHttps : _schemeHttp,
|
useTls ? _schemeHttps : _schemeHttp,
|
||||||
Header(ascii.encode(':path'), utf8.encode(path)),
|
Header(ascii.encode(':path'), utf8.encode(path)),
|
||||||
Header(ascii.encode(':authority'), utf8.encode(authority)),
|
Header(ascii.encode(':authority'), utf8.encode(authority)),
|
||||||
];
|
if (timeout != null)
|
||||||
if (timeout != null) {
|
Header.ascii('grpc-timeout', toTimeoutString(timeout)),
|
||||||
headers.add(Header.ascii('grpc-timeout', toTimeoutString(timeout)));
|
|
||||||
}
|
|
||||||
headers.addAll([
|
|
||||||
_contentTypeGrpc,
|
_contentTypeGrpc,
|
||||||
_teTrailers,
|
_teTrailers,
|
||||||
_grpcAcceptEncoding,
|
|
||||||
Header.ascii('user-agent', userAgent ?? defaultUserAgent),
|
Header.ascii('user-agent', userAgent ?? defaultUserAgent),
|
||||||
]);
|
if (grpcAcceptEncodings != null)
|
||||||
|
Header.ascii('grpc-accept-encoding', grpcAcceptEncodings),
|
||||||
|
if (compressionCodec != null)
|
||||||
|
Header.ascii('grpc-encoding', compressionCodec.encodingName)
|
||||||
|
];
|
||||||
metadata?.forEach((key, value) {
|
metadata?.forEach((key, value) {
|
||||||
headers.add(Header(ascii.encode(key), utf8.encode(value)));
|
headers.add(Header(ascii.encode(key), utf8.encode(value)));
|
||||||
});
|
});
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
import 'dart:math';
|
import 'dart:math';
|
||||||
|
|
||||||
|
import '../shared/codec_registry.dart';
|
||||||
import 'transport/http2_credentials.dart';
|
import 'transport/http2_credentials.dart';
|
||||||
|
|
||||||
const defaultIdleTimeout = Duration(minutes: 5);
|
const defaultIdleTimeout = Duration(minutes: 5);
|
||||||
|
@ -44,6 +46,7 @@ Duration defaultBackoffStrategy(Duration lastBackoff) {
|
||||||
class ChannelOptions {
|
class ChannelOptions {
|
||||||
final ChannelCredentials credentials;
|
final ChannelCredentials credentials;
|
||||||
final Duration idleTimeout;
|
final Duration idleTimeout;
|
||||||
|
final CodecRegistry codecRegistry;
|
||||||
|
|
||||||
/// The maximum time a single connection will be used for new requests.
|
/// The maximum time a single connection will be used for new requests.
|
||||||
final Duration connectionTimeout;
|
final Duration connectionTimeout;
|
||||||
|
@ -56,5 +59,6 @@ class ChannelOptions {
|
||||||
this.userAgent = defaultUserAgent,
|
this.userAgent = defaultUserAgent,
|
||||||
this.backoffStrategy = defaultBackoffStrategy,
|
this.backoffStrategy = defaultBackoffStrategy,
|
||||||
this.connectionTimeout = defaultConnectionTimeOut,
|
this.connectionTimeout = defaultConnectionTimeOut,
|
||||||
|
this.codecRegistry,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,11 +15,11 @@
|
||||||
|
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:grpc/grpc.dart';
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
|
|
||||||
import '../../shared/message.dart';
|
import '../../shared/message.dart';
|
||||||
import '../../shared/streams.dart';
|
import '../../shared/streams.dart';
|
||||||
|
|
||||||
import 'transport.dart';
|
import 'transport.dart';
|
||||||
|
|
||||||
class Http2TransportStream extends GrpcTransportStream {
|
class Http2TransportStream extends GrpcTransportStream {
|
||||||
|
@ -30,12 +30,16 @@ class Http2TransportStream extends GrpcTransportStream {
|
||||||
|
|
||||||
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
|
StreamSink<List<int>> get outgoingMessages => _outgoingMessages.sink;
|
||||||
|
|
||||||
Http2TransportStream(this._transportStream, this._onError)
|
Http2TransportStream(
|
||||||
: incomingMessages = _transportStream.incomingMessages
|
this._transportStream,
|
||||||
|
this._onError,
|
||||||
|
CodecRegistry codecRegistry,
|
||||||
|
Codec compression,
|
||||||
|
) : incomingMessages = _transportStream.incomingMessages
|
||||||
.transform(GrpcHttpDecoder())
|
.transform(GrpcHttpDecoder())
|
||||||
.transform(grpcDecompressor()) {
|
.transform(grpcDecompressor(codecRegistry: codecRegistry)) {
|
||||||
_outgoingMessages.stream
|
_outgoingMessages.stream
|
||||||
.map(frame)
|
.map((payload) => frame(payload, compression))
|
||||||
.map<StreamMessage>((bytes) => DataStreamMessage(bytes))
|
.map<StreamMessage>((bytes) => DataStreamMessage(bytes))
|
||||||
.handleError(_onError)
|
.handleError(_onError)
|
||||||
.listen(_transportStream.outgoingMessages.add,
|
.listen(_transportStream.outgoingMessages.add,
|
||||||
|
|
|
@ -18,11 +18,12 @@ import 'dart:convert';
|
||||||
|
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
|
|
||||||
|
import '../shared/codec.dart';
|
||||||
|
import '../shared/codec_registry.dart';
|
||||||
import '../shared/message.dart';
|
import '../shared/message.dart';
|
||||||
import '../shared/status.dart';
|
import '../shared/status.dart';
|
||||||
import '../shared/streams.dart';
|
import '../shared/streams.dart';
|
||||||
import '../shared/timeout.dart';
|
import '../shared/timeout.dart';
|
||||||
|
|
||||||
import 'call.dart';
|
import 'call.dart';
|
||||||
import 'interceptor.dart';
|
import 'interceptor.dart';
|
||||||
import 'service.dart';
|
import 'service.dart';
|
||||||
|
@ -32,6 +33,7 @@ class ServerHandler_ extends ServiceCall {
|
||||||
final ServerTransportStream _stream;
|
final ServerTransportStream _stream;
|
||||||
final Service Function(String service) _serviceLookup;
|
final Service Function(String service) _serviceLookup;
|
||||||
final List<Interceptor> _interceptors;
|
final List<Interceptor> _interceptors;
|
||||||
|
final CodecRegistry _codecRegistry;
|
||||||
|
|
||||||
StreamSubscription<GrpcMessage> _incomingSubscription;
|
StreamSubscription<GrpcMessage> _incomingSubscription;
|
||||||
|
|
||||||
|
@ -39,6 +41,7 @@ class ServerHandler_ extends ServiceCall {
|
||||||
ServiceMethod _descriptor;
|
ServiceMethod _descriptor;
|
||||||
|
|
||||||
Map<String, String> _clientMetadata;
|
Map<String, String> _clientMetadata;
|
||||||
|
Codec _callEncodingCodec;
|
||||||
|
|
||||||
StreamController _requests;
|
StreamController _requests;
|
||||||
bool _hasReceivedRequest = false;
|
bool _hasReceivedRequest = false;
|
||||||
|
@ -55,7 +58,12 @@ class ServerHandler_ extends ServiceCall {
|
||||||
bool _isTimedOut = false;
|
bool _isTimedOut = false;
|
||||||
Timer _timeoutTimer;
|
Timer _timeoutTimer;
|
||||||
|
|
||||||
ServerHandler_(this._serviceLookup, this._stream, this._interceptors);
|
ServerHandler_(
|
||||||
|
this._serviceLookup,
|
||||||
|
this._stream,
|
||||||
|
this._interceptors,
|
||||||
|
this._codecRegistry,
|
||||||
|
);
|
||||||
|
|
||||||
DateTime get deadline => _deadline;
|
DateTime get deadline => _deadline;
|
||||||
|
|
||||||
|
@ -74,7 +82,7 @@ class ServerHandler_ extends ServiceCall {
|
||||||
|
|
||||||
_incomingSubscription = _stream.incomingMessages
|
_incomingSubscription = _stream.incomingMessages
|
||||||
.transform(GrpcHttpDecoder())
|
.transform(GrpcHttpDecoder())
|
||||||
.transform(grpcDecompressor())
|
.transform(grpcDecompressor(codecRegistry: _codecRegistry))
|
||||||
.listen(_onDataIdle,
|
.listen(_onDataIdle,
|
||||||
onError: _onError, onDone: _onDoneError, cancelOnError: true);
|
onError: _onError, onDone: _onDoneError, cancelOnError: true);
|
||||||
_stream.outgoingMessages.done.then((_) {
|
_stream.outgoingMessages.done.then((_) {
|
||||||
|
@ -112,6 +120,13 @@ class ServerHandler_ extends ServiceCall {
|
||||||
}
|
}
|
||||||
final serviceName = pathSegments[1];
|
final serviceName = pathSegments[1];
|
||||||
final methodName = pathSegments[2];
|
final methodName = pathSegments[2];
|
||||||
|
if (_codecRegistry != null) {
|
||||||
|
final acceptedEncodings =
|
||||||
|
clientMetadata['grpc-accept-encoding']?.split(',') ?? [];
|
||||||
|
_callEncodingCodec = acceptedEncodings
|
||||||
|
.map(_codecRegistry.lookup)
|
||||||
|
.firstWhere((c) => c != null, orElse: () => null);
|
||||||
|
}
|
||||||
|
|
||||||
_service = _serviceLookup(serviceName);
|
_service = _serviceLookup(serviceName);
|
||||||
_descriptor = _service?.$lookupMethod(methodName);
|
_descriptor = _service?.$lookupMethod(methodName);
|
||||||
|
@ -251,7 +266,7 @@ class ServerHandler_ extends ServiceCall {
|
||||||
if (!_headersSent) {
|
if (!_headersSent) {
|
||||||
sendHeaders();
|
sendHeaders();
|
||||||
}
|
}
|
||||||
_stream.sendData(frame(bytes));
|
_stream.sendData(frame(bytes, _callEncodingCodec));
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
final grpcError = GrpcError.internal('Error sending response: $error');
|
final grpcError = GrpcError.internal('Error sending response: $error');
|
||||||
if (!_requests.isClosed) {
|
if (!_requests.isClosed) {
|
||||||
|
@ -285,7 +300,9 @@ class ServerHandler_ extends ServiceCall {
|
||||||
// TODO(jakobr): Should come from package:http2?
|
// TODO(jakobr): Should come from package:http2?
|
||||||
final outgoingHeadersMap = <String, String>{
|
final outgoingHeadersMap = <String, String>{
|
||||||
':status': '200',
|
':status': '200',
|
||||||
'content-type': 'application/grpc'
|
'content-type': 'application/grpc',
|
||||||
|
if (_callEncodingCodec != null)
|
||||||
|
'grpc-encoding': _callEncodingCodec.encodingName,
|
||||||
};
|
};
|
||||||
|
|
||||||
outgoingHeadersMap.addAll(_customHeaders);
|
outgoingHeadersMap.addAll(_customHeaders);
|
||||||
|
@ -384,7 +401,10 @@ class ServerHandler_ extends ServiceCall {
|
||||||
}
|
}
|
||||||
|
|
||||||
class ServerHandler extends ServerHandler_ {
|
class ServerHandler extends ServerHandler_ {
|
||||||
ServerHandler(Service Function(String service) serviceLookup, stream,
|
ServerHandler(
|
||||||
[List<Interceptor> interceptors = const <Interceptor>[]])
|
Service Function(String service) serviceLookup,
|
||||||
: super(serviceLookup, stream, interceptors);
|
stream, [
|
||||||
|
List<Interceptor> interceptors = const <Interceptor>[],
|
||||||
|
CodecRegistry codecRegistry,
|
||||||
|
]) : super(serviceLookup, stream, interceptors, codecRegistry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,11 +16,11 @@
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:io';
|
import 'dart:io';
|
||||||
|
|
||||||
|
import 'package:grpc/grpc.dart';
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
import 'package:meta/meta.dart';
|
import 'package:meta/meta.dart';
|
||||||
|
|
||||||
import '../shared/security.dart';
|
import '../shared/security.dart';
|
||||||
|
|
||||||
import 'handler.dart';
|
import 'handler.dart';
|
||||||
import 'interceptor.dart';
|
import 'interceptor.dart';
|
||||||
import 'service.dart';
|
import 'service.dart';
|
||||||
|
@ -84,13 +84,17 @@ class ServerTlsCredentials extends ServerCredentials {
|
||||||
class ConnectionServer {
|
class ConnectionServer {
|
||||||
final Map<String, Service> _services = {};
|
final Map<String, Service> _services = {};
|
||||||
final List<Interceptor> _interceptors;
|
final List<Interceptor> _interceptors;
|
||||||
|
final CodecRegistry _codecRegistry;
|
||||||
|
|
||||||
final _connections = <ServerTransportConnection>[];
|
final _connections = <ServerTransportConnection>[];
|
||||||
|
|
||||||
/// Create a server for the given [services].
|
/// Create a server for the given [services].
|
||||||
ConnectionServer(List<Service> services,
|
ConnectionServer(
|
||||||
[List<Interceptor> interceptors = const <Interceptor>[]])
|
List<Service> services, [
|
||||||
: _interceptors = interceptors {
|
List<Interceptor> interceptors = const <Interceptor>[],
|
||||||
|
CodecRegistry codecRegistry,
|
||||||
|
]) : _codecRegistry = codecRegistry,
|
||||||
|
_interceptors = interceptors {
|
||||||
for (final service in services) {
|
for (final service in services) {
|
||||||
_services[service.$name] = service;
|
_services[service.$name] = service;
|
||||||
}
|
}
|
||||||
|
@ -121,7 +125,8 @@ class ConnectionServer {
|
||||||
|
|
||||||
@visibleForTesting
|
@visibleForTesting
|
||||||
ServerHandler_ serveStream_(ServerTransportStream stream) {
|
ServerHandler_ serveStream_(ServerTransportStream stream) {
|
||||||
return ServerHandler_(lookupService, stream, _interceptors)..handle();
|
return ServerHandler_(lookupService, stream, _interceptors, _codecRegistry)
|
||||||
|
..handle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,9 +138,11 @@ class Server extends ConnectionServer {
|
||||||
SecureServerSocket _secureServer;
|
SecureServerSocket _secureServer;
|
||||||
|
|
||||||
/// Create a server for the given [services].
|
/// Create a server for the given [services].
|
||||||
Server(List<Service> services,
|
Server(
|
||||||
[List<Interceptor> interceptors = const <Interceptor>[]])
|
List<Service> services, [
|
||||||
: super(services, interceptors);
|
List<Interceptor> interceptors = const <Interceptor>[],
|
||||||
|
CodecRegistry codecRegistry,
|
||||||
|
]) : super(services, interceptors, codecRegistry);
|
||||||
|
|
||||||
/// The port that the server is listening on, or `null` if the server is not
|
/// The port that the server is listening on, or `null` if the server is not
|
||||||
/// active.
|
/// active.
|
||||||
|
@ -193,7 +200,8 @@ class Server extends ConnectionServer {
|
||||||
|
|
||||||
@visibleForTesting
|
@visibleForTesting
|
||||||
ServerHandler_ serveStream_(ServerTransportStream stream) {
|
ServerHandler_ serveStream_(ServerTransportStream stream) {
|
||||||
return ServerHandler_(lookupService, stream, _interceptors)..handle();
|
return ServerHandler_(lookupService, stream, _interceptors, _codecRegistry)
|
||||||
|
..handle();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Deprecated(
|
@Deprecated(
|
||||||
|
|
|
@ -0,0 +1,68 @@
|
||||||
|
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
|
||||||
|
// for details. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
import 'package:archive/archive.dart';
|
||||||
|
|
||||||
|
abstract class Codec {
|
||||||
|
/// Returns the message encoding that this compressor uses.
|
||||||
|
///
|
||||||
|
/// This can be values such as "gzip", "deflate", "snappy", etc.
|
||||||
|
String get encodingName;
|
||||||
|
|
||||||
|
/// Wraps an existing output stream with a compressed output.
|
||||||
|
List<int> compress(List<int> data);
|
||||||
|
|
||||||
|
/// Wraps an existing output stream with a uncompressed input data.
|
||||||
|
List<int> decompress(List<int> data);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The "identity", or "none" codec.
|
||||||
|
///
|
||||||
|
/// This codec is special in that it can be used to explicitly disable Call
|
||||||
|
/// compression on a Channel that by default compresses.
|
||||||
|
class IdentityCodec implements Codec {
|
||||||
|
const IdentityCodec();
|
||||||
|
|
||||||
|
@override
|
||||||
|
String get encodingName => "identity";
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<int> compress(List<int> data) {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<int> decompress(List<int> data) {
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A gzip compressor and decompressor.
|
||||||
|
class GzipCodec implements Codec {
|
||||||
|
const GzipCodec();
|
||||||
|
|
||||||
|
@override
|
||||||
|
String get encodingName => "gzip";
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<int> compress(List<int> data) {
|
||||||
|
return GZipEncoder().encode(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
List<int> decompress(List<int> data) {
|
||||||
|
return GZipDecoder().decodeBytes(data);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,50 @@
|
||||||
|
// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file
|
||||||
|
// for details. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
import 'codec.dart';
|
||||||
|
|
||||||
|
/// Encloses classes related to the compression and decompression of messages.
|
||||||
|
class CodecRegistry {
|
||||||
|
CodecRegistry({List<Codec> codecs = const [IdentityCodec()]})
|
||||||
|
: assert(codecs != null),
|
||||||
|
_codecs = Map.fromIterable(codecs, key: (c) => c.encodingName),
|
||||||
|
_supportedEncodings = codecs.map((c) {
|
||||||
|
if (c.encodingName.contains(',')) {
|
||||||
|
throw ArgumentError.value(c.encodingName, 'codecs',
|
||||||
|
'contains entries with names containing ","');
|
||||||
|
}
|
||||||
|
return c.encodingName;
|
||||||
|
}).join(',') {
|
||||||
|
if (_codecs.length != codecs.length) {
|
||||||
|
throw ArgumentError.value(
|
||||||
|
codecs, 'codecs', 'contains multiple entries with the same name');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
factory CodecRegistry.empty() {
|
||||||
|
return CodecRegistry(codecs: []);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Key refers to the `encodingName` param from the [Codec].
|
||||||
|
final Map<String, Codec> _codecs;
|
||||||
|
|
||||||
|
final String _supportedEncodings;
|
||||||
|
|
||||||
|
Codec lookup(String codecName) {
|
||||||
|
return _codecs[codecName];
|
||||||
|
}
|
||||||
|
|
||||||
|
String get supportedEncodings => _supportedEncodings;
|
||||||
|
}
|
|
@ -16,6 +16,10 @@
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:typed_data';
|
import 'dart:typed_data';
|
||||||
|
|
||||||
|
import 'codec.dart';
|
||||||
|
import 'codec_registry.dart';
|
||||||
|
import 'status.dart';
|
||||||
|
|
||||||
abstract class GrpcMessage {}
|
abstract class GrpcMessage {}
|
||||||
|
|
||||||
class GrpcMetadata extends GrpcMessage {
|
class GrpcMetadata extends GrpcMessage {
|
||||||
|
@ -54,25 +58,38 @@ class GrpcMessageSink extends Sink<GrpcMessage> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
List<int> frame(List<int> payload) {
|
List<int> frame(List<int> rawPayload, [Codec codec]) {
|
||||||
final payloadLength = payload.length;
|
final compressedPayload =
|
||||||
|
codec == null ? rawPayload : codec.compress(rawPayload);
|
||||||
|
final payloadLength = compressedPayload.length;
|
||||||
final bytes = Uint8List(payloadLength + 5);
|
final bytes = Uint8List(payloadLength + 5);
|
||||||
final header = bytes.buffer.asByteData(0, 5);
|
final header = bytes.buffer.asByteData(0, 5);
|
||||||
header.setUint8(0, 0); // TODO(dart-lang/grpc-dart#6): Handle compression
|
header.setUint8(0, codec == null ? 0 : 1);
|
||||||
header.setUint32(1, payloadLength);
|
header.setUint32(1, payloadLength);
|
||||||
bytes.setRange(5, bytes.length, payload);
|
bytes.setRange(5, bytes.length, compressedPayload);
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor() =>
|
StreamTransformer<GrpcMessage, GrpcMessage> grpcDecompressor({
|
||||||
StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
|
CodecRegistry codecRegistry,
|
||||||
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
|
}) {
|
||||||
if (value is GrpcData) {
|
Codec codec;
|
||||||
if (value.isCompressed) {
|
return StreamTransformer<GrpcMessage, GrpcMessage>.fromHandlers(
|
||||||
// TODO(dart-lang/grpc-dart#6): Actually handle decompression.
|
handleData: (GrpcMessage value, EventSink<GrpcMessage> sink) {
|
||||||
sink.add(GrpcData(value.data, isCompressed: false));
|
if (value is GrpcData && value.isCompressed) {
|
||||||
return;
|
if (codec == null) {
|
||||||
}
|
sink.addError(
|
||||||
|
GrpcError.unimplemented('Compression mechanism not supported'),
|
||||||
|
);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
sink.add(value);
|
sink.add(GrpcData(codec.decompress(value.data), isCompressed: false));
|
||||||
});
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value is GrpcMetadata && value.metadata.containsKey('grpc-encoding')) {
|
||||||
|
codec = codecRegistry?.lookup(value.metadata['grpc-encoding']);
|
||||||
|
}
|
||||||
|
sink.add(value);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
name: grpc
|
name: grpc
|
||||||
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
|
description: Dart implementation of gRPC, a high performance, open-source universal RPC framework.
|
||||||
|
|
||||||
version: 2.8.0
|
version: 2.9.0-dev
|
||||||
|
|
||||||
homepage: https://github.com/dart-lang/grpc-dart
|
homepage: https://github.com/dart-lang/grpc-dart
|
||||||
|
|
||||||
|
@ -9,6 +9,7 @@ environment:
|
||||||
sdk: '>=2.8.0 <3.0.0'
|
sdk: '>=2.8.0 <3.0.0'
|
||||||
|
|
||||||
dependencies:
|
dependencies:
|
||||||
|
archive: ^2.0.13
|
||||||
async: ^2.2.0
|
async: ^2.2.0
|
||||||
crypto: ^2.1.4
|
crypto: ^2.1.4
|
||||||
fixnum: ^0.10.11
|
fixnum: ^0.10.11
|
||||||
|
|
|
@ -63,6 +63,30 @@ void main() {
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('Unary call attaches encoding headers', () async {
|
||||||
|
const requestValue = 17;
|
||||||
|
const responseValue = 19;
|
||||||
|
|
||||||
|
void handleRequest(StreamMessage message) {
|
||||||
|
final data = validateDataMessage(message);
|
||||||
|
expect(mockDecode(data.data), requestValue);
|
||||||
|
|
||||||
|
harness
|
||||||
|
..sendResponseHeader()
|
||||||
|
..sendResponseValue(responseValue)
|
||||||
|
..sendResponseTrailer();
|
||||||
|
}
|
||||||
|
|
||||||
|
await harness.runTest(
|
||||||
|
clientCall: harness.client.unary(requestValue,
|
||||||
|
options: CallOptions(metadata: {'grpc-accept-encoding': 'gzip'})),
|
||||||
|
expectedResult: responseValue,
|
||||||
|
expectedCustomHeaders: {'grpc-accept-encoding': 'gzip'},
|
||||||
|
expectedPath: '/Test/Unary',
|
||||||
|
serverHandlers: [handleRequest],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
test('Client-streaming calls work on the client', () async {
|
test('Client-streaming calls work on the client', () async {
|
||||||
const requests = [17, 3];
|
const requests = [17, 3];
|
||||||
const response = 12;
|
const response = 12;
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
@TestOn('vm')
|
@TestOn('vm')
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:io';
|
import 'dart:io';
|
||||||
|
|
||||||
import 'package:grpc/grpc.dart';
|
import 'package:grpc/grpc.dart';
|
||||||
import 'package:grpc/service_api.dart' as api;
|
import 'package:grpc/service_api.dart' as api;
|
||||||
import 'package:grpc/src/client/channel.dart' hide ClientChannel;
|
import 'package:grpc/src/client/channel.dart' hide ClientChannel;
|
||||||
import 'package:grpc/src/client/connection.dart';
|
import 'package:grpc/src/client/connection.dart';
|
||||||
import 'package:grpc/src/client/http2_connection.dart';
|
import 'package:grpc/src/client/http2_connection.dart';
|
||||||
import 'package:test/test.dart';
|
import 'package:test/test.dart';
|
||||||
|
|
||||||
import 'common.dart';
|
import 'common.dart';
|
||||||
|
|
||||||
class TestClient extends Client {
|
class TestClient extends Client {
|
||||||
|
@ -73,6 +75,30 @@ main() async {
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
testTcpAndUds('round trip with outgoing and incoming compression',
|
||||||
|
(address) async {
|
||||||
|
final Server server = Server(
|
||||||
|
[TestService()], const [], CodecRegistry(codecs: const [GzipCodec()]));
|
||||||
|
await server.serve(address: address, port: 0);
|
||||||
|
|
||||||
|
final channel = FixedConnectionClientChannel(Http2ClientConnection(
|
||||||
|
address,
|
||||||
|
server.port,
|
||||||
|
ChannelOptions(
|
||||||
|
credentials: ChannelCredentials.insecure(),
|
||||||
|
codecRegistry: CodecRegistry(codecs: const [GzipCodec()]),
|
||||||
|
),
|
||||||
|
));
|
||||||
|
final testClient = TestClient(channel);
|
||||||
|
expect(
|
||||||
|
await testClient
|
||||||
|
.stream(TestService.requestFiniteStream,
|
||||||
|
options: CallOptions(compression: const GzipCodec()))
|
||||||
|
.toList(),
|
||||||
|
[1, 2, 3]);
|
||||||
|
await server.shutdown();
|
||||||
|
});
|
||||||
|
|
||||||
testTcpAndUds('round trip secure connection', (address) async {
|
testTcpAndUds('round trip secure connection', (address) async {
|
||||||
// round trip test of secure connection.
|
// round trip test of secure connection.
|
||||||
final Server server = Server([TestService()]);
|
final Server server = Server([TestService()]);
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
import 'package:grpc/src/shared/codec.dart';
|
||||||
|
import 'package:grpc/src/shared/codec_registry.dart';
|
||||||
|
import 'package:test/test.dart';
|
||||||
|
|
||||||
|
void main() {
|
||||||
|
test('CodecRegistry register adds new encodings', () {
|
||||||
|
final registry = CodecRegistry();
|
||||||
|
expect(registry.supportedEncodings, 'identity');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('CodecRegistry lookup', () {
|
||||||
|
final registry = CodecRegistry();
|
||||||
|
expect(registry.lookup('identity'), const IdentityCodec());
|
||||||
|
});
|
||||||
|
}
|
|
@ -16,14 +16,13 @@
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
import 'dart:convert';
|
import 'dart:convert';
|
||||||
|
|
||||||
|
import 'package:grpc/grpc.dart';
|
||||||
import 'package:grpc/src/client/channel.dart' as base;
|
import 'package:grpc/src/client/channel.dart' as base;
|
||||||
import 'package:grpc/src/client/http2_connection.dart';
|
import 'package:grpc/src/client/http2_connection.dart';
|
||||||
import 'package:grpc/src/shared/message.dart';
|
import 'package:grpc/src/shared/message.dart';
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
import 'package:test/test.dart';
|
|
||||||
import 'package:mockito/mockito.dart';
|
import 'package:mockito/mockito.dart';
|
||||||
|
import 'package:test/test.dart';
|
||||||
import 'package:grpc/grpc.dart';
|
|
||||||
|
|
||||||
import 'utils.dart';
|
import 'utils.dart';
|
||||||
|
|
||||||
|
@ -69,6 +68,7 @@ class FakeChannelOptions implements ChannelOptions {
|
||||||
Duration connectionTimeout = const Duration(seconds: 10);
|
Duration connectionTimeout = const Duration(seconds: 10);
|
||||||
String userAgent = 'dart-grpc/1.0.0 test';
|
String userAgent = 'dart-grpc/1.0.0 test';
|
||||||
BackoffStrategy backoffStrategy = testBackoff;
|
BackoffStrategy backoffStrategy = testBackoff;
|
||||||
|
CodecRegistry codecRegistry = CodecRegistry.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
class FakeChannel extends ClientChannel {
|
class FakeChannel extends ClientChannel {
|
||||||
|
|
|
@ -15,13 +15,12 @@
|
||||||
|
|
||||||
import 'dart:async';
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:grpc/grpc.dart';
|
||||||
import 'package:grpc/src/client/http2_connection.dart';
|
import 'package:grpc/src/client/http2_connection.dart';
|
||||||
import 'package:grpc/src/shared/message.dart';
|
import 'package:grpc/src/shared/message.dart';
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
import 'package:test/test.dart';
|
import 'package:test/test.dart';
|
||||||
|
|
||||||
import 'package:grpc/grpc.dart';
|
|
||||||
|
|
||||||
import 'utils.dart';
|
import 'utils.dart';
|
||||||
|
|
||||||
class TestService extends Service {
|
class TestService extends Service {
|
||||||
|
@ -189,7 +188,7 @@ abstract class _Harness {
|
||||||
Map<String, String> metadata,
|
Map<String, String> metadata,
|
||||||
Duration timeout}) {
|
Duration timeout}) {
|
||||||
final headers = Http2ClientConnection.createCallHeaders(
|
final headers = Http2ClientConnection.createCallHeaders(
|
||||||
true, authority, path, timeout, metadata,
|
true, authority, path, timeout, metadata, null,
|
||||||
userAgent: 'dart-grpc/1.0.0 test');
|
userAgent: 'dart-grpc/1.0.0 test');
|
||||||
toServer.add(HeadersStreamMessage(headers));
|
toServer.add(HeadersStreamMessage(headers));
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,8 +15,8 @@
|
||||||
|
|
||||||
import 'dart:convert';
|
import 'dart:convert';
|
||||||
|
|
||||||
import 'package:grpc/src/shared/streams.dart';
|
|
||||||
import 'package:grpc/src/shared/message.dart';
|
import 'package:grpc/src/shared/message.dart';
|
||||||
|
import 'package:grpc/src/shared/streams.dart';
|
||||||
import 'package:http2/transport.dart';
|
import 'package:http2/transport.dart';
|
||||||
import 'package:test/test.dart';
|
import 'package:test/test.dart';
|
||||||
|
|
||||||
|
@ -44,7 +44,6 @@ void validateRequestHeaders(Map<String, String> headers,
|
||||||
expect(headers['grpc-timeout'], timeout);
|
expect(headers['grpc-timeout'], timeout);
|
||||||
expect(headers['content-type'], 'application/grpc');
|
expect(headers['content-type'], 'application/grpc');
|
||||||
expect(headers['te'], 'trailers');
|
expect(headers['te'], 'trailers');
|
||||||
expect(headers['grpc-accept-encoding'], 'identity');
|
|
||||||
expect(headers['user-agent'], startsWith('dart-grpc/'));
|
expect(headers['user-agent'], startsWith('dart-grpc/'));
|
||||||
|
|
||||||
customHeaders?.forEach((key, value) {
|
customHeaders?.forEach((key, value) {
|
||||||
|
|
Loading…
Reference in New Issue