From b8197a5897acb6030dff43eb99a0d9849cb7501d Mon Sep 17 00:00:00 2001 From: Ben Konyi Date: Wed, 11 Nov 2020 06:51:16 -0800 Subject: [PATCH] Add timeline logging to grpc client (#392) Co-authored-by: Zichang Guo --- lib/grpc.dart | 3 + lib/grpc_web.dart | 2 + lib/src/client/call.dart | 58 +++++++++- lib/src/client/channel.dart | 9 +- lib/src/shared/profiler.dart | 36 ++++++ test/timeline_test.dart | 219 +++++++++++++++++++++++++++++++++++ 6 files changed, 321 insertions(+), 6 deletions(-) create mode 100644 lib/src/shared/profiler.dart create mode 100644 test/timeline_test.dart diff --git a/lib/grpc.dart b/lib/grpc.dart index 1917664..396386e 100644 --- a/lib/grpc.dart +++ b/lib/grpc.dart @@ -59,6 +59,9 @@ export 'src/server/server.dart' export 'src/server/service.dart' show ServiceMethod, Service; export 'src/shared/message.dart' show GrpcMessage, GrpcMetadata, GrpcData, grpcDecompressor; + +export 'src/shared/profiler.dart' show isTimelineLoggingEnabled; + export 'src/shared/security.dart' show supportedAlpnProtocols, createSecurityContext; export 'src/shared/status.dart' show StatusCode, GrpcError; diff --git a/lib/grpc_web.dart b/lib/grpc_web.dart index da8f17b..fa83439 100644 --- a/lib/grpc_web.dart +++ b/lib/grpc_web.dart @@ -23,4 +23,6 @@ export 'src/client/common.dart' show Response, ResponseStream, ResponseFuture; export 'src/client/web_channel.dart' show GrpcWebClientChannel; +export 'src/shared/profiler.dart' show isTimelineLoggingEnabled; + export 'src/shared/status.dart' show StatusCode, GrpcError; diff --git a/lib/src/client/call.dart b/lib/src/client/call.dart index e6dce07..a685c43 100644 --- a/lib/src/client/call.dart +++ b/lib/src/client/call.dart @@ -15,12 +15,14 @@ import 'dart:async'; import 'dart:convert'; +import 'dart:developer'; import 'package:grpc/src/generated/google/rpc/status.pb.dart'; import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart'; import '../shared/message.dart'; +import '../shared/profiler.dart'; import '../shared/status.dart'; import 'common.dart'; import 'connection.dart'; @@ -174,7 +176,15 @@ class ClientCall implements Response { bool isCancelled = false; Timer _timeoutTimer; - ClientCall(this._method, this._requests, this.options) { + final TimelineTask _requestTimeline; + TimelineTask _responseTimeline; + + ClientCall(this._method, this._requests, this.options, + [this._requestTimeline]) { + _requestTimeline?.start('gRPC Request: ${_method.path}', arguments: { + 'method': _method.path, + 'timeout': options?.timeout?.toString(), + }); _responses = StreamController(onListen: _onResponseListen); if (options.timeout != null) { _timeoutTimer = Timer(options.timeout, _onTimedOut); @@ -186,6 +196,7 @@ class ClientCall implements Response { } void _terminateWithError(GrpcError error) { + _finishTimelineWithError(error, _requestTimeline); if (!_responses.isClosed) { _responses.addError(error); } @@ -240,8 +251,17 @@ class ClientCall implements Response { _terminateWithError(GrpcError.unavailable('Error making call: $e')); return; } + _requestTimeline?.instant('Request sent', arguments: { + 'metadata': metadata, + }); _requestSubscription = _requests - .map(_method.requestSerializer) + .map((data) { + _requestTimeline?.instant('Data sent', arguments: { + 'data': data.toString(), + }); + _requestTimeline?.finish(); + return _method.requestSerializer(data); + }) .handleError(_onRequestError) .listen(_stream.outgoingMessages.add, onError: _stream.outgoingMessages.addError, @@ -252,8 +272,16 @@ class ClientCall implements Response { _onResponseListen(); } + void _finishTimelineWithError(GrpcError error, TimelineTask timeline) { + timeline?.finish(arguments: { + 'error': error.toString(), + }); + } + void _onTimedOut() { - _responses.addError(GrpcError.deadlineExceeded('Deadline exceeded')); + final error = GrpcError.deadlineExceeded('Deadline exceeded'); + _finishTimelineWithError(error, _requestTimeline); + _responses.addError(error); _safeTerminate(); } @@ -278,6 +306,7 @@ class ClientCall implements Response { /// Emit an error response to the user, and tear down this call. void _responseError(GrpcError error, [StackTrace stackTrace]) { + _finishTimelineWithError(error, _responseTimeline); _responses.addError(error, stackTrace); _timeoutTimer?.cancel(); _requestSubscription?.cancel(); @@ -317,7 +346,11 @@ class ClientCall implements Response { return; } try { - _responses.add(_method.responseDeserializer(data.data)); + final decodedData = _method.responseDeserializer(data.data); + _responseTimeline?.instant('Data received', arguments: { + 'data': decodedData.toString(), + }); + _responses.add(decodedData); _hasReceivedResponses = true; } catch (e, s) { _responseError(GrpcError.dataLoss('Error parsing response'), s); @@ -325,6 +358,14 @@ class ClientCall implements Response { } else if (data is GrpcMetadata) { if (!_headers.isCompleted) { _headerMetadata = data.metadata; + if (_requestTimeline != null) { + _responseTimeline = timelineTaskFactory( + parent: _requestTimeline, filterKey: clientTimelineFilterKey); + } + _responseTimeline?.start('gRPC Response'); + _responseTimeline?.instant('Metadata received', arguments: { + 'headers': _headerMetadata.toString(), + }); _headers.complete(_headerMetadata); return; } @@ -333,6 +374,9 @@ class ClientCall implements Response { return; } final metadata = data.metadata; + _responseTimeline?.instant('Metadata received', arguments: { + 'trailers': metadata.toString(), + }); _trailers.complete(metadata); /// Process status error if necessary @@ -373,6 +417,7 @@ class ClientCall implements Response { /// Process status error if necessary _checkForErrorStatus(_headerMetadata); } + _responseTimeline?.finish(); _timeoutTimer?.cancel(); _responses.close(); _responseSubscription.cancel(); @@ -386,6 +431,7 @@ class ClientCall implements Response { error = GrpcError.unknown(error.toString()); } + _finishTimelineWithError(error, _requestTimeline); _responses.addError(error, stackTrace); _timeoutTimer?.cancel(); _responses.close(); @@ -405,7 +451,9 @@ class ClientCall implements Response { @override Future cancel() { if (!_responses.isClosed) { - _responses.addError(GrpcError.cancelled('Cancelled by client.')); + final error = GrpcError.cancelled('Cancelled by client.'); + _responses.addError(error); + _finishTimelineWithError(error, _requestTimeline); } return _terminate(); } diff --git a/lib/src/client/channel.dart b/lib/src/client/channel.dart index a7c4a5a..ad71d76 100644 --- a/lib/src/client/channel.dart +++ b/lib/src/client/channel.dart @@ -15,6 +15,7 @@ import 'dart:async'; +import '../shared/profiler.dart'; import '../shared/status.dart'; import 'call.dart'; @@ -75,7 +76,13 @@ abstract class ClientChannelBase implements ClientChannel { @override ClientCall createCall( ClientMethod method, Stream requests, CallOptions options) { - final call = ClientCall(method, requests, options); + final call = ClientCall( + method, + requests, + options, + isTimelineLoggingEnabled + ? timelineTaskFactory(filterKey: clientTimelineFilterKey) + : null); getConnection().then((connection) { if (call.isCancelled) return; connection.dispatchCall(call); diff --git a/lib/src/shared/profiler.dart b/lib/src/shared/profiler.dart new file mode 100644 index 0000000..3a3bb86 --- /dev/null +++ b/lib/src/shared/profiler.dart @@ -0,0 +1,36 @@ +// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:developer'; + +import 'package:meta/meta.dart'; + +@visibleForTesting +typedef TimelineTask TimelineTaskFactory( + {String filterKey, TimelineTask parent}); + +@visibleForTesting +TimelineTaskFactory timelineTaskFactory = _defaultTimelineTaskFactory; + +TimelineTask _defaultTimelineTaskFactory( + {String filterKey, TimelineTask parent}) => + TimelineTask(filterKey: filterKey, parent: parent); + +const String clientTimelineFilterKey = 'grpc/client'; + +/// Enable logging requests and response for clients. +/// +/// Logging is disabled by default. +bool isTimelineLoggingEnabled = false; diff --git a/test/timeline_test.dart b/test/timeline_test.dart new file mode 100644 index 0000000..5983a52 --- /dev/null +++ b/test/timeline_test.dart @@ -0,0 +1,219 @@ +// Copyright (c) 2020, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +@TestOn('vm') +import 'dart:async'; +import 'dart:developer'; + +import 'package:grpc/grpc.dart'; +import 'package:grpc/service_api.dart' as api; +import 'package:grpc/src/client/channel.dart' hide ClientChannel; +import 'package:grpc/src/client/connection.dart'; +import 'package:grpc/src/client/http2_connection.dart'; +import 'package:grpc/src/shared/profiler.dart'; +import 'package:mockito/mockito.dart'; +import 'package:test/test.dart'; + +const String path = '/test.TestService/stream'; + +class TestClient extends Client { + static final _$stream = ClientMethod( + path, (int value) => [value], (List value) => value[0]); + + TestClient(api.ClientChannel channel) : super(channel); + ResponseStream stream(int request, {CallOptions options}) { + final call = + $createCall(_$stream, Stream.fromIterable([request]), options: options); + return ResponseStream(call); + } +} + +class TestService extends Service { + String get $name => 'test.TestService'; + + TestService() { + $addMethod(ServiceMethod('stream', stream, false, true, + (List value) => value[0], (int value) => [value])); + } + + Stream stream(ServiceCall call, Future request) async* { + yield 1; + yield 2; + yield 3; + } +} + +class FixedConnectionClientChannel extends ClientChannelBase { + final Http2ClientConnection clientConnection; + List states = []; + FixedConnectionClientChannel(this.clientConnection) { + clientConnection.onStateChanged = (c) => states.add(c.state); + } + @override + ClientConnection createConnection() => clientConnection; +} + +class FakeTimelineTask extends Fake implements TimelineTask { + static final List tasks = []; + static final List events = []; + static int _idCount = 0; + + final String filterKey; + final TimelineTask parent; + final int id = _idCount++; + int _startFinishCount = 0; + + factory FakeTimelineTask({TimelineTask parent, String filterKey}) { + final task = FakeTimelineTask._(parent: parent, filterKey: filterKey); + tasks.add(task); + return task; + } + + FakeTimelineTask._({this.parent, this.filterKey}); + + bool get isComplete => _startFinishCount == 0; + + void start(String name, {Map arguments}) { + events.add({ + 'id': id, + 'ph': 'b', + 'name': name, + 'args': { + if (filterKey != null) 'filterKey': filterKey, + if (parent != null) 'parentId': (parent as FakeTimelineTask).id, + if (arguments != null) ...arguments, + } + }); + ++_startFinishCount; + } + + void instant(String name, {Map arguments}) { + events.add({ + 'id': id, + 'ph': 'i', + 'name': name, + 'args': { + if (filterKey != null) 'filterKey': filterKey, + if (arguments != null) ...arguments, + } + }); + } + + void finish({Map arguments}) { + events.add({ + 'id': id, + 'ph': 'e', + 'args': { + if (filterKey != null) 'filterKey': filterKey, + if (arguments != null) ...arguments, + } + }); + --_startFinishCount; + expect(_startFinishCount >= 0, true); + } +} + +TimelineTask fakeTimelineTaskFactory({String filterKey, TimelineTask parent}) => + FakeTimelineTask(filterKey: filterKey, parent: parent); + +testee() async { + final Server server = Server([TestService()]); + await server.serve(address: 'localhost', port: 0); + isTimelineLoggingEnabled = true; + timelineTaskFactory = fakeTimelineTaskFactory; + final channel = FixedConnectionClientChannel(Http2ClientConnection( + 'localhost', + server.port, + ChannelOptions(credentials: ChannelCredentials.insecure()), + )); + final testClient = TestClient(channel); + await testClient.stream(1).toList(); + await server.shutdown(); +} + +void checkStartEvent(List events) { + final e = events.where((e) => e['ph'] == 'b').toList(); + expect(e.length, 2); + + expect(e[0]['name'], 'gRPC Request: $path'); + expect(e[0]['id'], 0); + expect(e[0]['args']['method'], isNotNull); + expect(e[0]['args']['method'], equals(path)); + + expect(e[1]['name'], 'gRPC Response'); + expect(e[1]['id'], 1); + expect(e[1]['args']['parentId'], 0); +} + +void checkSendEvent(List events) { + final e = events.firstWhere((e) => e['name'] == 'Request sent'); + expect(e['args']['metadata'], isNotNull); +} + +void checkWriteEvent(List events) { + final e = events.firstWhere((e) => e['name'] == 'Data sent'); + expect(e['args']['data'], isNotNull); + expect(e['args']['data'], equals('1')); +} + +void checkReceiveEvent(List events) { + events = events.where((e) => e['name'] == 'Data received').toList(); + expect(events.length, equals(3)); + int sum = 0; + for (final e in events) { + expect(e['id'], 1); + // 3 elements are 1, 2 and 3. + sum |= 1 << int.parse(e['args']['data']); + } + expect(sum, equals(14)); +} + +void checkReceiveMetaDataEvent(List events) { + events = events.where((e) => e['name'] == 'Metadata received').toList(); + expect(events.length, equals(2)); + for (final e in events) { + expect(e['id'], 1); + if (e['args']['headers'] != null) { + final header = e['args']['headers']; + expect(header, contains('status: 200')); + expect(header, contains('content-type: application/grpc')); + } else { + expect(e['args']['trailers'], contains('grpc-status: 0')); + } + } +} + +void checkFinishEvent(List events) { + final e = events.where((e) => e['ph'] == 'e').toList(); + expect(e.length, 2); +} + +main([args = const []]) { + test('Test gRPC timeline logging', () async { + await testee(); + for (final task in FakeTimelineTask.tasks) { + expect(task.isComplete, true); + } + final events = FakeTimelineTask.events + .where((e) => e['args']['filterKey'] == 'grpc/client') + .toList(); + checkStartEvent(events); + checkSendEvent(events); + checkWriteEvent(events); + checkReceiveEvent(events); + checkReceiveMetaDataEvent(events); + checkFinishEvent(events); + }); +}