Expose client IP address to server (#590)

This commit is contained in:
Ben Getsug 2022-12-13 02:12:28 -06:00 committed by GitHub
parent 4dc6e2b252
commit d0e3a4c706
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 64 additions and 22 deletions

View File

@ -7,6 +7,7 @@
`connectTimeout`. On timeout, a `SocketException` is thrown.
* Require Dart 2.17 or greater.
* Fix issue [#51](https://github.com/grpc/grpc-dart/issues/51), add support for custom error handling.
* Expose client IP address to server
## 3.1.0

View File

@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import '../shared/io_bits/io_bits.dart' show X509Certificate;
import '../shared/io_bits/io_bits.dart' show InternetAddress, X509Certificate;
/// Server-side context for a gRPC call.
///
@ -44,6 +44,9 @@ abstract class ServiceCall {
/// Returns the client certificate if it is requested and available
X509Certificate? get clientCertificate;
/// Returns the IP address of the client, if available
InternetAddress? get remoteAddress;
/// Send response headers. This is done automatically before sending the first
/// response message, but can be done manually before the first response is
/// ready, if necessary.

View File

@ -20,7 +20,7 @@ import 'package:http2/transport.dart';
import '../shared/codec.dart';
import '../shared/codec_registry.dart';
import '../shared/io_bits/io_bits.dart' show X509Certificate;
import '../shared/io_bits/io_bits.dart' show InternetAddress, X509Certificate;
import '../shared/message.dart';
import '../shared/status.dart';
import '../shared/streams.dart';
@ -63,7 +63,9 @@ class ServerHandler extends ServiceCall {
bool _isCanceled = false;
bool _isTimedOut = false;
Timer? _timeoutTimer;
final X509Certificate? _clientCertificate;
final InternetAddress? _remoteAddress;
ServerHandler({
required ServerTransportStream stream,
@ -71,12 +73,14 @@ class ServerHandler extends ServiceCall {
required List<Interceptor> interceptors,
required CodecRegistry? codecRegistry,
X509Certificate? clientCertificate,
InternetAddress? remoteAddress,
GrpcErrorHandler? errorHandler,
}) : _stream = stream,
_serviceLookup = serviceLookup,
_interceptors = interceptors,
_codecRegistry = codecRegistry,
_clientCertificate = clientCertificate,
_remoteAddress = remoteAddress,
_errorHandler = errorHandler;
@override
@ -100,14 +104,21 @@ class ServerHandler extends ServiceCall {
@override
X509Certificate? get clientCertificate => _clientCertificate;
@override
InternetAddress? get remoteAddress => _remoteAddress;
void handle() {
_stream.onTerminated = (_) => cancel();
_incomingSubscription = _stream.incomingMessages
.transform(GrpcHttpDecoder())
.transform(grpcDecompressor(codecRegistry: _codecRegistry))
.listen(_onDataIdle,
onError: _onError, onDone: _onDoneError, cancelOnError: true);
.listen(
_onDataIdle,
onError: _onError,
onDone: _onDoneError,
cancelOnError: true,
);
_stream.outgoingMessages.done.then((_) {
cancel();
});
@ -342,8 +353,11 @@ class ServerHandler extends ServiceCall {
}
@override
void sendTrailers(
{int? status = 0, String? message, Map<String, String>? errorTrailers}) {
void sendTrailers({
int? status = 0,
String? message,
Map<String, String>? errorTrailers,
}) {
_timeoutTimer?.cancel();
final outgoingTrailersMap = <String, String>{};

View File

@ -107,16 +107,21 @@ class ConnectionServer {
Service? lookupService(String service) => _services[service];
Future<void> serveConnection(
ServerTransportConnection connection, [
Future<void> serveConnection({
required ServerTransportConnection connection,
X509Certificate? clientCertificate,
]) async {
InternetAddress? remoteAddress,
}) async {
_connections.add(connection);
ServerHandler? handler;
// TODO(jakobr): Set active state handlers, close connection after idle
// timeout.
connection.incomingStreams.listen((stream) {
handler = serveStream_(stream, clientCertificate);
handler = serveStream_(
stream: stream,
clientCertificate: clientCertificate,
remoteAddress: remoteAddress,
);
}, onError: (error, stackTrace) {
if (error is Error) {
Zone.current.handleUncaughtError(error, stackTrace);
@ -132,10 +137,11 @@ class ConnectionServer {
}
@visibleForTesting
ServerHandler serveStream_(
ServerTransportStream stream, [
ServerHandler serveStream_({
required ServerTransportStream stream,
X509Certificate? clientCertificate,
]) {
InternetAddress? remoteAddress,
}) {
return ServerHandler(
stream: stream,
serviceLookup: lookupService,
@ -143,6 +149,8 @@ class ConnectionServer {
codecRegistry: _codecRegistry,
// ignore: unnecessary_cast
clientCertificate: clientCertificate as io_bits.X509Certificate?,
// ignore: unnecessary_cast
remoteAddress: remoteAddress as io_bits.InternetAddress?,
errorHandler: _errorHandler,
)..handle();
}
@ -228,13 +236,23 @@ class Server extends ConnectionServer {
if (socket.address.type != InternetAddressType.unix) {
socket.setOption(SocketOption.tcpNoDelay, true);
}
X509Certificate? clientCertificate;
if (socket is SecureSocket) {
clientCertificate = socket.peerCertificate;
}
final connection = ServerTransportConnection.viaSocket(socket,
settings: http2ServerSettings);
serveConnection(connection, clientCertificate);
final connection = ServerTransportConnection.viaSocket(
socket,
settings: http2ServerSettings,
);
serveConnection(
connection: connection,
clientCertificate: clientCertificate,
remoteAddress: socket.remoteAddress,
);
}, onError: (error, stackTrace) {
if (error is Error) {
Zone.current.handleUncaughtError(error, stackTrace);
@ -244,10 +262,11 @@ class Server extends ConnectionServer {
@override
@visibleForTesting
ServerHandler serveStream_(
ServerTransportStream stream, [
ServerHandler serveStream_({
required ServerTransportStream stream,
X509Certificate? clientCertificate,
]) {
InternetAddress? remoteAddress,
}) {
return ServerHandler(
stream: stream,
serviceLookup: lookupService,
@ -255,6 +274,8 @@ class Server extends ConnectionServer {
codecRegistry: _codecRegistry,
// ignore: unnecessary_cast
clientCertificate: clientCertificate as io_bits.X509Certificate?,
// ignore: unnecessary_cast
remoteAddress: remoteAddress as io_bits.InternetAddress?,
errorHandler: _errorHandler,
)..handle();
}
@ -262,7 +283,7 @@ class Server extends ConnectionServer {
@Deprecated(
'This is internal functionality, and will be removed in next major version.')
void serveStream(ServerTransportStream stream) {
serveStream_(stream);
serveStream_(stream: stream);
}
Future<void> shutdown() async {

View File

@ -13,4 +13,4 @@
// See the License for the specific language governing permissions and
// limitations under the License.
export 'dart:io' show HttpStatus, X509Certificate;
export 'dart:io' show HttpStatus, InternetAddress, X509Certificate;

View File

@ -15,6 +15,9 @@
export 'dart:html' show HttpStatus;
/// Unavailable on the web
class InternetAddress {}
/// Should not be used on the Web, but is pulled through [ServiceCall] class
/// which is used in the protoc generated code.
class X509Certificate {}

View File

@ -157,7 +157,7 @@ abstract class _Harness {
void setUp() {
final stream = TestServerStream(toServer.stream, fromServer.sink);
server.serveStream_(stream);
server.serveStream_(stream: stream);
}
void tearDown() {