/** * @license * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ 'use strict'; const options = { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }; var _ = require('lodash'); var assert = require('assert'); const anyGrpc = require('../any_grpc'); const clientGrpc = anyGrpc.client const serverGrpc = anyGrpc.server; const protoLoader = require('../../packages/proto-loader'); var insecureCreds = clientGrpc.credentials.createInsecure(); const echoProtoDef = protoLoader.loadSync(__dirname + '/../proto/echo_service.proto', options); const EchoClient = clientGrpc.loadPackageDefinition(echoProtoDef).EchoService; const echo_service = echoProtoDef.EchoService; var StatusBuilder = clientGrpc.StatusBuilder; var ListenerBuilder = clientGrpc.ListenerBuilder; var InterceptingCall = clientGrpc.InterceptingCall; var RequesterBuilder = clientGrpc.RequesterBuilder; const Metadata = clientGrpc.Metadata; var CallRegistry = function(done, expectation, is_ordered, is_verbose) { this.call_map = {}; this.call_array = []; this.done = done; this.expectation = expectation; this.expectation_is_array = Array.isArray(this.expectation); this.is_ordered = is_ordered; this.is_verbose = is_verbose; if (is_verbose) { console.log('Expectation: ', expectation); } }; CallRegistry.prototype.addCall = function(call_name) { if (this.expectation_is_array) { this.call_array.push(call_name); if (this.is_verbose) { console.log(this.call_array); } } else { if (!this.call_map[call_name]) { this.call_map[call_name] = 0; } this.call_map[call_name]++; if (this.is_verbose) { console.log(this.call_map); } } this.maybeCallDone(); }; CallRegistry.prototype.maybeCallDone = function() { if (this.expectation_is_array) { if (this.is_ordered) { if (this.expectation && _.isEqual(this.expectation, this.call_array)) { this.done(); } } else { var intersection = _.intersectionWith(this.expectation, this.call_array, _.isEqual); if (intersection.length === this.expectation.length) { this.done(); } } } else if (this.expectation && _.isEqual(this.expectation, this.call_map)) { this.done(); } }; describe(`${anyGrpc.clientName} client -> ${anyGrpc.serverName} server`, function() { describe('Client interceptors', function() { var echo_server; var echo_port; var client; function startServer(done) { echo_server = new serverGrpc.Server(); echo_server.addService(echo_service, { echo: function(call, callback) { call.sendMetadata(call.metadata); if (call.request.value === 'error') { var status = { code: 2, message: 'test status message' }; status.metadata = call.metadata; callback(status, null); return; } callback(null, call.request); }, echoClientStream: function(call, callback){ call.sendMetadata(call.metadata); var payload; var err = null; call.on('data', function(data) { if (data.value === 'error') { err = { code: 2, message: 'test status message' }; err.metadata = call.metadata; return; } payload = data; }); call.on('end', function() { callback(err, payload, call.metadata); }); }, echoServerStream: function(call) { call.sendMetadata(call.metadata); if (call.request.value === 'error') { var status = { code: 2, message: 'test status message' }; status.metadata = call.metadata; call.emit('error', status); return; } call.write(call.request); call.end(call.metadata); }, echoBidiStream: function(call) { call.sendMetadata(call.metadata); call.on('data', function(data) { if (data.value === 'error') { var status = { code: 2, message: 'test status message' }; call.emit('error', status); return; } call.write(data); }); call.on('end', function() { call.end(call.metadata); }); } }); var server_credentials = serverGrpc.ServerCredentials.createInsecure(); echo_server.bindAsync('localhost:0', server_credentials, (error, port) => { assert.ifError(error); echo_port = port; echo_server.start(); done(); }); } function stopServer() { echo_server.forceShutdown(); } function resetClient() { client = new EchoClient('localhost:' + echo_port, insecureCreds); } before(function(done) { startServer(done); }); beforeEach(function() { resetClient(); }); after(function() { stopServer(); }); describe('execute downstream interceptors when a new call is made outbound', function() { var registry; var options; before(function() { var interceptor_a = function (options, nextCall) { var stored_listener; var stored_metadata; options.call_number = 1; registry.addCall('construct a ' + options.call_number); return new InterceptingCall(nextCall(options), { start: function (metadata, listener, next) { registry.addCall('start a ' + options.call_number); stored_listener = listener; stored_metadata = metadata; next(metadata, listener); }, sendMessage: function (message, next) { registry.addCall('send a ' + options.call_number); var options2 = _.clone(options); options2.call_number = 2; var second_call = nextCall(options2); second_call.start(stored_metadata); second_call.sendMessage(message); second_call.halfClose(); next(message); }, halfClose: function (next) { registry.addCall('close a ' + options.call_number); next(); } }); }; var interceptor_b = function (options, nextCall) { registry.addCall('construct b ' + options.call_number); return new InterceptingCall(nextCall(options), { start: function (metadata, listener, next) { registry.addCall('start b ' + options.call_number); next(metadata, listener); }, sendMessage: function (message, next) { registry.addCall('send b ' + options.call_number); next(message); }, halfClose: function (next) { registry.addCall('close b ' + options.call_number); next(); } }); }; options = { interceptors: [interceptor_a, interceptor_b] }; }); var expected_calls = [ 'construct a 1', 'construct b 1', 'start a 1', 'start b 1', 'send a 1', 'construct b 2', 'start b 2', 'send b 2', 'close b 2', 'send b 1', 'close a 1', 'close b 1', 'response' ]; it('with unary call', function(done) { registry = new CallRegistry(done, expected_calls, true); var message = {}; message.value = 'foo'; client.echo(message, options, function(err, response){ if (!err) { registry.addCall('response'); } }); }); it('with client streaming call', function(done) { registry = new CallRegistry(done, expected_calls, false); var message = {}; message.value = 'foo'; var stream = client.echoClientStream(options, function(err, response) { if (!err) { registry.addCall('response'); } }); stream.write(message); stream.end(); }); it('with server streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var message = {}; message.value = 'foo'; var stream = client.echoServerStream(message, options); stream.on('data', function(data) { registry.addCall('response'); }); }); it('with bidi streaming call', function(done) { registry = new CallRegistry( done, expected_calls, true); var message = {}; message.value = 'foo'; var stream = client.echoBidiStream(options); stream.on('data', function(data) { registry.addCall('response'); }); stream.write(message); stream.end(); }); }); describe('execute downstream interceptors when a new call is made inbound', function() { var registry; var options; before(function() { var interceptor_a = function (options, nextCall) { return new InterceptingCall(nextCall(options), { start: function (metadata, listener, next) { next(metadata, { onReceiveMetadata: function (metadata, next) { }, onReceiveMessage: function (message, next) { registry.addCall('interceptor_a'); var second_call = nextCall(options); second_call.start(metadata, listener); second_call.sendMessage(message); second_call.halfClose(); }, onReceiveStatus: function (status, next) { } }); } }); }; var interceptor_b = function (options, nextCall) { return new InterceptingCall(nextCall(options), { start: function (metadata, listener, next) { next(metadata, { onReceiveMessage: function (message, next) { registry.addCall('interceptor_b'); next(message); } }); } }); }; options = { interceptors: [interceptor_a, interceptor_b] }; }); var expected_calls = ['interceptor_b', 'interceptor_a', 'interceptor_b', 'response']; it('with unary call', function(done) { registry = new CallRegistry(done, expected_calls, true); var message = {}; message.value = 'foo'; client.echo(message, options, function(err) { assert.ifError(err); registry.addCall('response'); }); }); it('with client streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var message = {}; message.value = 'foo'; var stream = client.echoClientStream(options, function(err, response) { assert.ifError(err); registry.addCall('response'); }); stream.write(message); stream.end(); }); }); it('will delay operations and short circuit unary requests', function(done) { var registry = new CallRegistry(done, ['foo_miss', 'foo_hit', 'bar_miss', 'foo_hit_done', 'foo_miss_done', 'bar_miss_done']); var cache = {}; var _getCachedResponse = function(value) { return cache[value]; }; var _store = function(key, value) { cache[key] = value; }; var interceptor = function(options, nextCall) { var savedMetadata; var startNext; var storedListener; var storedMessage; var messageNext; var requester = (new RequesterBuilder()) .withStart(function(metadata, listener, next) { savedMetadata = metadata; storedListener = listener; startNext = next; }) .withSendMessage(function(message, next) { storedMessage = message; messageNext = next; }) .withHalfClose(function(next) { var cachedValue = _getCachedResponse(storedMessage.value); if (cachedValue) { var cachedMessage = {}; cachedMessage.value = cachedValue; registry.addCall(storedMessage.value + '_hit'); storedListener.onReceiveMetadata(new Metadata()); storedListener.onReceiveMessage(cachedMessage); storedListener.onReceiveStatus( (new StatusBuilder()).withCode(clientGrpc.status.OK).build()); } else { registry.addCall(storedMessage.value + '_miss'); var newListener = (new ListenerBuilder()).withOnReceiveMessage( function(message, next) { _store(storedMessage.value, message.value); next(message); }).build(); startNext(savedMetadata, newListener); messageNext(storedMessage); next(); } }) .withCancel(function(message, next) { next(); }).build(); return new InterceptingCall(nextCall(options), requester); }; var options = { interceptors: [interceptor] }; var foo_message = {}; foo_message.value = 'foo'; client.echo(foo_message, options, function(err, response){ assert.equal(response.value, 'foo'); registry.addCall('foo_miss_done'); client.echo(foo_message, options, function(err, response){ assert.equal(response.value, 'foo'); registry.addCall('foo_hit_done'); }); }); var bar_message = {}; bar_message.value = 'bar'; client.echo(bar_message, options, function(err, response) { assert.equal(response.value, 'bar'); registry.addCall('bar_miss_done'); }); }); it('can retry failed messages and handle eventual success', function(done) { var registry = new CallRegistry(done, ['retry_foo_1', 'retry_foo_2', 'retry_foo_3', 'foo_result', 'retry_bar_1', 'bar_result']); var maxRetries = 3; var retry_interceptor = function(options, nextCall) { var savedMetadata; var savedSendMessage; var savedReceiveMessage; var savedMessageNext; var requester = (new RequesterBuilder()) .withStart(function(metadata, listener, next) { savedMetadata = metadata; var new_listener = (new ListenerBuilder()) .withOnReceiveMessage(function(message, next) { savedReceiveMessage = message; savedMessageNext = next; }) .withOnReceiveStatus(function(status, next) { var retries = 0; var retry = function(message, metadata) { retries++; var newCall = nextCall(options); var receivedMessage; newCall.start(metadata, { onReceiveMessage: function(message) { receivedMessage = message; }, onReceiveStatus: function(status) { registry.addCall('retry_' + savedMetadata.get('name') + '_' + retries); if (status.code !== clientGrpc.status.OK) { if (retries <= maxRetries) { retry(message, metadata); } else { savedMessageNext(receivedMessage); next(status); } } else { registry.addCall('success_call'); var new_status = (new StatusBuilder()) .withCode(clientGrpc.status.OK).build(); savedMessageNext(receivedMessage); next(new_status); } } }); newCall.sendMessage(message); newCall.halfClose(); }; if (status.code !== clientGrpc.status.OK) { // Change the message we're sending only for test purposes // so the server will respond without error var newMessage = (savedMetadata.get('name')[0] === 'bar') ? {value: 'bar'} : savedSendMessage; retry(newMessage, savedMetadata); } else { savedMessageNext(savedReceiveMessage); next(status); } } ).build(); next(metadata, new_listener); }) .withSendMessage(function(message, next) { savedSendMessage = message; next(message); }).build(); return new InterceptingCall(nextCall(options), requester); }; var options = { interceptors: [retry_interceptor] }; // Make a call which the server will return a non-OK status for var foo_message = {value: 'error'}; var foo_metadata = new Metadata(); foo_metadata.set('name', 'foo'); client.echo(foo_message, foo_metadata, options, function(err, response) { assert.strictEqual(err.code, 2); registry.addCall('foo_result'); }); // Make a call which will fail the first time and succeed on the first // retry var bar_message = {value: 'error'}; var bar_metadata = new Metadata(); bar_metadata.set('name', 'bar'); client.echo(bar_message, bar_metadata, options, function(err, response) { assert.strictEqual(response.value, 'bar'); registry.addCall('bar_result'); }); }); it('can retry and preserve interceptor order on success', function(done) { var registry = new CallRegistry(done, ['interceptor_c', 'retry_interceptor', 'fail_call', 'interceptor_c', 'success_call', 'interceptor_a', 'result'], true); var interceptor_a = function(options, nextCall) { var requester = (new RequesterBuilder()) .withStart(function(metadata, listener, next) { var new_listener = (new ListenerBuilder()) .withOnReceiveMessage(function(message, next) { registry.addCall('interceptor_a'); next(message); }).build(); next(metadata, new_listener); }).build(); return new InterceptingCall(nextCall(options), requester); }; var retry_interceptor = function(options, nextCall) { var savedMetadata; var savedMessage; var savedMessageNext; var sendMessageNext; var originalMessage; var startNext; var originalListener; var requester = (new RequesterBuilder()) .withStart(function(metadata, listener, next) { startNext = next; savedMetadata = metadata; originalListener = listener; var new_listener = (new ListenerBuilder()) .withOnReceiveMessage(function(message, next) { savedMessage = message; savedMessageNext = next; }) .withOnReceiveStatus(function(status, next) { var retries = 0; var maxRetries = 1; var receivedMessage; var retry = function(message, metadata) { retries++; var new_call = nextCall(options); new_call.start(metadata, { onReceiveMessage: function(message) { receivedMessage = message; }, onReceiveStatus: function(status) { if (status.code !== clientGrpc.status.OK) { if (retries <= maxRetries) { retry(message, metadata); } else { savedMessageNext(receivedMessage); next(status); } } else { registry.addCall('success_call'); var new_status = (new StatusBuilder()) .withCode(clientGrpc.status.OK).build(); savedMessageNext(receivedMessage); next(new_status); } } }); new_call.sendMessage(message); new_call.halfClose(); }; registry.addCall('retry_interceptor'); if (status.code !== clientGrpc.status.OK) { registry.addCall('fail_call'); var newMessage = {value: 'foo'}; retry(newMessage, savedMetadata); } else { savedMessageNext(savedMessage); next(status); } }).build(); next(metadata, new_listener); }) .withSendMessage(function(message, next) { sendMessageNext = next; originalMessage = message; next(message); }) .build(); return new InterceptingCall(nextCall(options), requester); }; var interceptor_c = function(options, nextCall) { var requester = (new RequesterBuilder()) .withStart(function(metadata, listener, next) { var new_listener = (new ListenerBuilder()) .withOnReceiveMessage(function(message, next) { registry.addCall('interceptor_c'); next(message); }).build(); next(metadata, new_listener); }).build(); return new InterceptingCall(nextCall(options), requester); }; var options = { interceptors: [interceptor_a, retry_interceptor, interceptor_c] }; var message = {value: 'error'}; client.echo(message, options, function(err, response) { assert.ifError(err); assert.strictEqual(response.value, 'foo'); registry.addCall('result'); }); }); describe('handle interceptor errors', function () { var options; before(function () { var foo_interceptor = function (options, nextCall) { var savedListener; var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { savedListener = listener; next(metadata, listener); }) .withSendMessage(function (message, next) { savedListener.onReceiveMetadata(new Metadata()); savedListener.onReceiveMessage({ value: 'failed' }); var error_status = (new StatusBuilder()) .withCode(16) .withDetails('Error in foo interceptor') .build(); savedListener.onReceiveStatus(error_status); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [foo_interceptor] }; }); it('with unary call', function(done) { var message = {}; client.echo(message, options, function(err, response) { assert.strictEqual(err.code, 16); assert.strictEqual(err.message, '16 UNAUTHENTICATED: Error in foo interceptor'); done(); }); }); }); describe('implement fallbacks for streaming RPCs', function() { var options; before(function () { var fallback_response = { value: 'fallback' }; var interceptor = function (options, nextCall) { var savedMessage; var savedMessageNext; var requester = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { var new_listener = (new ListenerBuilder()) .withOnReceiveMessage(function (message, next) { savedMessage = message; savedMessageNext = next; }) .withOnReceiveStatus(function (status, next) { if (status.code !== clientGrpc.status.OK) { savedMessageNext(fallback_response); next((new StatusBuilder()).withCode(clientGrpc.status.OK).build()); } else { savedMessageNext(savedMessage); next(status); } }).build(); next(metadata, new_listener); }).build(); return new InterceptingCall(nextCall(options), requester); }; options = { interceptors: [interceptor] }; }); it('with client streaming call', function (done) { var registry = new CallRegistry(done, ['foo_result', 'fallback_result']); var stream = client.echoClientStream(options, function (err, response) { assert.ifError(err); assert.strictEqual(response.value, 'foo'); registry.addCall('foo_result'); }); stream.write({ value: 'foo' }); stream.end(); stream = client.echoClientStream(options, function(err, response) { assert.ifError(err); assert.strictEqual(response.value, 'fallback'); registry.addCall('fallback_result'); }); stream.write({value: 'error'}); stream.end(); }); }); describe('allows the call options to be modified for downstream interceptors', function() { var done; var options; var method_name; var method_path_last; before(function() { var interceptor_a = function (options, nextCall) { options.deadline = 10; return new InterceptingCall(nextCall(options)); }; var interceptor_b = function (options, nextCall) { assert.equal(options.method_definition.path, '/EchoService/' + method_path_last); assert.equal(options.deadline, 10); done(); return new InterceptingCall(nextCall(options)); }; options = { interceptors: [interceptor_a, interceptor_b], deadline: 100 }; }); it('with unary call', function(cb) { done = cb; var metadata = new Metadata(); var message = {}; method_name = 'echo'; method_path_last = 'Echo'; client.echo(message, metadata, options, function(){}); }); it('with client streaming call', function(cb) { done = cb; var metadata = new Metadata(); method_name = 'echoClientStream'; method_path_last = 'EchoClientStream'; client.echoClientStream(metadata, options, function() {}); }); it('with server streaming call', function(cb) { done = cb; var metadata = new Metadata(); var message = {}; method_name = 'echoServerStream'; method_path_last = 'EchoServerStream'; const stream = client.echoServerStream(message, metadata, options); stream.on('error', () => {}); }); it('with bidi streaming call', function(cb) { done = cb; var metadata = new Metadata(); method_name = 'echoBidiStream'; method_path_last = 'EchoBidiStream'; const stream = client.echoBidiStream(metadata, options); stream.on('error', () => {}); }); }); describe('pass accurate MethodDefinitions', function() { var registry; var initial_value = 'broken'; var expected_value = 'working'; var options; before(function() { var interceptor = function (options, nextCall) { registry.addCall({ path: options.method_definition.path, requestStream: options.method_definition.requestStream, responseStream: options.method_definition.responseStream }); var outbound = (new RequesterBuilder()) .withSendMessage(function (message, next) { message.value = expected_value; next(message); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [interceptor] }; }); it('with unary call', function(done) { var unary_definition = { path: '/EchoService/Echo', requestStream: false, responseStream: false }; registry = new CallRegistry(done, [ unary_definition, 'result_unary' ]); var metadata = new Metadata(); var message = {value: initial_value}; client.echo(message, metadata, options, function(err, response){ assert.equal(response.value, expected_value); registry.addCall('result_unary'); }); }); it('with client streaming call', function(done) { var client_stream_definition = { path: '/EchoService/EchoClientStream', requestStream: true, responseStream: false }; registry = new CallRegistry(done, [ client_stream_definition, 'result_client_stream' ]); var metadata = new Metadata(); var message = {value: initial_value}; var client_stream = client.echoClientStream(metadata, options, function(err, response) { assert.strictEqual(response.value, expected_value); registry.addCall('result_client_stream'); }); client_stream.write(message); client_stream.end(); }); it('with server streaming call', function(done) { var server_stream_definition = { path: '/EchoService/EchoServerStream', responseStream: true, requestStream: false, }; registry = new CallRegistry(done, [ server_stream_definition, 'result_server_stream' ]); var metadata = new Metadata(); var message = {value: initial_value}; var server_stream = client.echoServerStream(message, metadata, options); server_stream.on('data', function(data) { assert.strictEqual(data.value, expected_value); registry.addCall('result_server_stream'); }); }); it('with bidi streaming call', function(done) { var bidi_stream_definition = { path: '/EchoService/EchoBidiStream', requestStream: true, responseStream: true }; registry = new CallRegistry(done, [ bidi_stream_definition, 'result_bidi_stream' ]); var metadata = new Metadata(); var message = {value: initial_value}; var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('data', function(data) { assert.strictEqual(data.value, expected_value); registry.addCall('result_bidi_stream'); }); bidi_stream.write(message); bidi_stream.end(); }); }); it('uses interceptors passed to the client constructor', function(done) { var registry = new CallRegistry(done, { 'constructor_interceptor_a_echo': 1, 'constructor_interceptor_b_echoServerStream': 1, 'invocation_interceptor': 1, 'result_unary': 1, 'result_stream': 1, 'result_invocation': 1, 'status_stream': 1 }); var constructor_interceptor_a = function(options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function(metadata, listener, next) { registry.addCall('constructor_interceptor_a_echo'); next(metadata, listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; var constructor_interceptor_b = function(options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function(metadata, listener, next) { registry.addCall('constructor_interceptor_b_echoServerStream'); next(metadata, listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; var invocation_interceptor = function(options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function(metadata, listener, next) { registry.addCall('invocation_interceptor'); next(metadata, listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; var interceptor_providers = [ function(method_definition) { if (!method_definition.requestStream && !method_definition.responseStream) { return constructor_interceptor_a; } }, function(method_definition) { if (!method_definition.requestStream && method_definition.responseStream) { return constructor_interceptor_b; } } ]; var constructor_options = { interceptor_providers: interceptor_providers }; var int_client = new EchoClient('localhost:' + echo_port, insecureCreds, constructor_options); var message = {}; int_client.echo(message, function(error, response) { assert.ifError(error); registry.addCall('result_unary'); }); var stream = int_client.echoServerStream(message); stream.on('data', function() { registry.addCall('result_stream'); }); stream.on('status', (status) => { registry.addCall('status_stream'); }); stream.on('error', (error) => { assert.ifError(error); }); var options = { interceptors: [invocation_interceptor] }; int_client.echo(message, options, function() { registry.addCall('result_invocation'); }); }); it('will reject conflicting interceptor options at invocation', function(done) { const interceptor = (options, nextCall) => { new InterceptingCall(nextCall(options)); }; const interceptorProvider = methodDefinition => interceptor try { client.echo('message', { interceptors: [interceptor], interceptor_providers: [interceptorProvider] }, function () {}); } catch (e) { assert.equal(e.name, 'InterceptorConfigurationError'); done(); } }); it('will resolve interceptor providers at invocation', function(done) { var constructor_interceptor = function(options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function() { assert(false); }).build(); return new InterceptingCall(nextCall(options), outbound); }; var invocation_interceptor = function(options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function() { done(); }).build(); return new InterceptingCall(nextCall(options), outbound); }; var constructor_interceptor_providers = [ function() { return constructor_interceptor; } ]; var invocation_interceptor_providers = [ function() { return invocation_interceptor; } ]; var constructor_options = { interceptor_providers: constructor_interceptor_providers }; var int_client = new EchoClient('localhost:' + echo_port, insecureCreds, constructor_options); var message = {}; var options = { interceptor_providers: invocation_interceptor_providers }; int_client.echo(message, options, function() {}); }); describe('trigger a stack of interceptors in nested order', function() { var registry; var expected_calls = ['constructA', 'constructB', 'outboundA', 'outboundB', 'inboundB', 'inboundA', 'callDone']; var options; before(function() { var interceptor_a = function (options, nextCall) { registry.addCall('constructA'); var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { registry.addCall('outboundA'); var new_listener = (new ListenerBuilder()).withOnReceiveMessage( function (message, next) { registry.addCall('inboundA'); next(message); }).build(); next(metadata, new_listener); }).build(); return new clientGrpc.InterceptingCall(nextCall(options), outbound); }; var interceptor_b = function (options, nextCall) { registry.addCall('constructB'); var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { registry.addCall('outboundB'); var new_listener = (new ListenerBuilder()).withOnReceiveMessage( function (message, next) { registry.addCall('inboundB'); next(message); }).build(); next(metadata, new_listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [interceptor_a, interceptor_b] }; }); var metadata = new Metadata(); var message = {}; it('with unary call', function(done) { registry = new CallRegistry(done, expected_calls, true); client.echo(message, metadata, options, function(error, response){ assert.ifError(error); registry.addCall('callDone'); }); }); it('with client streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var client_stream = client.echoClientStream(metadata, options, function(error, response) { assert.ifError(error); registry.addCall('callDone'); }); client_stream.write(message); client_stream.end(); }); it('with server streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var stream = client.echoServerStream(message, metadata, options); stream.on('data', function() {}); stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('callDone'); }); stream.on('error', (error) => { assert.ifError(error); }); }); it('with bidi streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('data', function(){}); bidi_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('callDone'); }); bidi_stream.on('error', (error) => { assert.ifError(error); }); bidi_stream.write(message); bidi_stream.end(); }); }); describe('trigger interceptors horizontally', function() { var expected_calls = [ 'interceptor_a_start', 'interceptor_b_start', 'interceptor_a_send', 'interceptor_b_send', 'call_end' ]; var registry; var options; var metadata = new Metadata(); var message = {}; before(function() { var interceptor_a = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { registry.addCall('interceptor_a_start'); next(metadata, listener); }) .withSendMessage(function (message, next) { registry.addCall('interceptor_a_send'); next(message); }).build(); return new InterceptingCall(nextCall(options), outbound); }; var interceptor_b = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { registry.addCall('interceptor_b_start'); next(metadata, listener); }) .withSendMessage(function (message, next) { registry.addCall('interceptor_b_send'); next(message); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [interceptor_a, interceptor_b] }; }); it('with unary call', function(done) { registry = new CallRegistry(done, expected_calls, true); client.echo(message, metadata, options, function(error, response){ assert.ifError(error); registry.addCall('call_end'); }); }); it('with client streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var client_stream = client.echoClientStream(metadata, options, function(error, response) { assert.ifError(error); registry.addCall('call_end'); }); client_stream.write(message); client_stream.end(); }); it('with server streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var stream = client.echoServerStream(message, metadata, options); stream.on('data', function() {}); stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('call_end'); }); stream.on('error', (error) => { assert.ifError(error); }); }); it('with bidi streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('data', function(){}); bidi_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('call_end'); }); bidi_stream.on('error', (error) => { assert.ifError(error); }); bidi_stream.write(message); bidi_stream.end(); }); }); describe('trigger when sending metadata', function() { var registry; var message = {}; var key_names = ['original', 'foo', 'bar']; var keys = { original: 'originalkey', foo: 'fookey', bar: 'barkey' }; var values = { original: 'originalvalue', foo: 'foovalue', bar: 'barvalue' }; var expected_calls = ['foo', 'bar', 'response', 'end']; var options; before(function () { var foo_interceptor = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { metadata.add(keys.foo, values.foo); registry.addCall('foo'); next(metadata, listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; var bar_interceptor = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { metadata.add(keys.bar, values.bar); registry.addCall('bar'); next(metadata, listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [foo_interceptor, bar_interceptor] }; }); it('with unary call', function (done) { registry = new CallRegistry(done, expected_calls, true); var metadata = new Metadata(); metadata.add(keys.original, values.original); var unary_call = client.echo(message, metadata, options, function (error, response) { assert.ifError(error); registry.addCall('end'); }); unary_call.on('metadata', function (metadata) { var has_expected_values = _.every(key_names, function (key_name) { return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); }); assert(has_expected_values); registry.addCall('response'); }); }); it('with client streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var metadata = new Metadata(); metadata.add(keys.original, values.original); var client_stream = client.echoClientStream(metadata, options, function (error, response) { assert.ifError(error); registry.addCall('end'); }); client_stream.write(message); client_stream.on('metadata', function (metadata) { var has_expected_values = _.every(key_names, function (key_name) { return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); }); assert(has_expected_values); registry.addCall('response'); }); client_stream.end(); }); it('with server streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var metadata = new Metadata(); metadata.add(keys.original, values.original); var server_stream = client.echoServerStream(message, metadata, options); server_stream.on('metadata', function (metadata) { var has_expected_values = _.every(key_names, function (key_name) { return _.isEqual(metadata.get(keys[key_name]), [values[key_name]]); }); assert(has_expected_values); registry.addCall('response'); }); server_stream.on('data', function() { }); server_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('end'); }); server_stream.on('error', (error) => { assert.ifError(error); }); }); it('with bidi streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var metadata = new Metadata(); metadata.add(keys.original, values.original); var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('metadata', function(metadata) { var has_expected_values = _.every(key_names, function(key_name) { return _.isEqual(metadata.get(keys[key_name]),[values[key_name]]); }); assert(has_expected_values); bidi_stream.end(); registry.addCall('response'); }); bidi_stream.on('data', function() { }); bidi_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('end'); }); bidi_stream.on('error', (error) => { assert.ifError(error); }); bidi_stream.write(message); bidi_stream.end(); }); }); describe('trigger when sending messages', function() { var registry; var originalValue = 'foo'; var expectedValue = 'bar'; var options; var metadata = new Metadata(); var expected_calls = ['messageIntercepted', 'response', 'end']; before(function() { var foo_interceptor = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withSendMessage(function (message, next) { assert.strictEqual(message.value, originalValue); registry.addCall('messageIntercepted'); next({ value: expectedValue }); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [foo_interceptor] }; }); it('with unary call', function(done) { registry = new CallRegistry(done, expected_calls, true); var message = {value: originalValue}; client.echo(message, metadata, options, function (err, response) { assert.ifError(err); assert.strictEqual(response.value, expectedValue); registry.addCall('response'); registry.addCall('end'); }); }); it('with client streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var message = {value: originalValue}; var client_stream = client.echoClientStream(metadata, options, function (err, response) { assert.ifError(err); assert.strictEqual(response.value, expectedValue); registry.addCall('response'); registry.addCall('end'); }); client_stream.write(message); client_stream.end(); }); it('with server streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var message = {value: originalValue}; var server_stream = client.echoServerStream(message, metadata, options); server_stream.on('data', function (data) { assert.strictEqual(data.value, expectedValue); registry.addCall('response'); }); server_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('end'); }); server_stream.on('error', (error) => { assert.ifError(error); }); }); it('with bidi streaming call', function(done) { registry = new CallRegistry(done, expected_calls, true); var message = {value: originalValue}; var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('data', function(data) { assert.strictEqual(data.value, expectedValue); registry.addCall('response'); }); bidi_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('end'); }); bidi_stream.on('error', (error) => { assert.ifError(error); }); bidi_stream.write(message); bidi_stream.end(); }); }); describe('trigger when client closes the call', function() { var registry; var expected_calls = [ 'response', 'halfClose', 'end' ]; var message = {}; var options; before(function() { var foo_interceptor = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withHalfClose(function (next) { registry.addCall('halfClose'); next(); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [foo_interceptor] }; }); it('with unary call', function (done) { registry = new CallRegistry(done, expected_calls); client.echo(message, options, function (err, response) { assert.ifError(err); registry.addCall('response'); registry.addCall('end'); }); }); it('with client streaming call', function (done) { registry = new CallRegistry(done, expected_calls); var client_stream = client.echoClientStream(options, function (err, response) { assert.ifError(err); registry.addCall('response'); registry.addCall('end'); }); client_stream.write(message); client_stream.end(); }); it('with server streaming call', function (done) { registry = new CallRegistry(done, expected_calls); var server_stream = client.echoServerStream(message, options); server_stream.on('data', function (data) { registry.addCall('response'); }); server_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('end'); }); server_stream.on('error', (error) => { assert.ifError(error); }); }); it('with bidi streaming call', function (done) { registry = new CallRegistry(done, expected_calls); var bidi_stream = client.echoBidiStream(options); bidi_stream.on('data', function (data) { registry.addCall('response'); }); bidi_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); registry.addCall('end'); }); bidi_stream.on('error', (error) => { assert.ifError(error); }); bidi_stream.write(message); bidi_stream.end(); }); }); describe('trigger when the stream is canceled', function() { var done; var message = {}; var options; before(function() { var foo_interceptor = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withCancel(function (next) { done(); next(); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [foo_interceptor] }; }); it('with unary call', function(cb) { done = cb; var stream = client.echo(message, options, function() {}); stream.cancel(); }); it('with client streaming call', function(cb) { done = cb; var stream = client.echoClientStream(options, function() {}); stream.cancel(); }); it('with server streaming call', function(cb) { done = cb; var stream = client.echoServerStream(message, options); stream.on('error', (error) => { assert.strictEqual(error.code, clientGrpc.status.CANCELLED); }); stream.cancel(); }); it('with bidi streaming call', function(cb) { done = cb; var stream = client.echoBidiStream(options); stream.on('error', (error) => { assert.strictEqual(error.code, clientGrpc.status.CANCELLED); }); stream.cancel(); }); }); describe('trigger when receiving metadata', function() { var message = {}; var expectedKey = 'foo'; var expectedValue = 'bar'; var options; before(function() { var foo_interceptor = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { var new_listener = (new ListenerBuilder()).withOnReceiveMetadata( function (metadata, next) { metadata.add(expectedKey, expectedValue); next(metadata); }).build(); next(metadata, new_listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [foo_interceptor] }; }); it('with unary call', function(done) { var metadata = new Metadata(); var unary_call = client.echo(message, metadata, options, function () {}); unary_call.on('metadata', function (metadata) { assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); done(); }); }); it('with client streaming call', function(done) { var metadata = new Metadata(); var client_stream = client.echoClientStream(metadata, options, function () {}); client_stream.write(message); client_stream.on('metadata', function (metadata) { assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); done(); }); client_stream.end(); }); it('with server streaming call', function(done) { var metadata = new Metadata(); var server_stream = client.echoServerStream(message, metadata, options); server_stream.on('metadata', function (metadata) { assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); done(); }); server_stream.on('error', (error) => { assert.ifError(error); }); server_stream.on('data', function() { }); }); it('with bidi streaming call', function(done) { var metadata = new Metadata(); var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('metadata', function(metadata) { assert.strictEqual(metadata.get(expectedKey)[0], expectedValue); bidi_stream.end(); done(); }); bidi_stream.on('data', function() { }); bidi_stream.on('error', (error) => { assert.ifError(error); }) bidi_stream.write(message); }); }); describe('trigger when sending messages', function() { var originalValue = 'foo'; var expectedValue = 'bar'; var options; var metadata = new Metadata(); before(function() { var foo_interceptor = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { var new_listener = (new ListenerBuilder()).withOnReceiveMessage( function (message, next) { if (!message) { next(message); return; } assert.strictEqual(message.value, originalValue); message.value = expectedValue; next(message); }).build(); next(metadata, new_listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [foo_interceptor] }; }); it('with unary call', function (done) { var message = { value: originalValue }; client.echo(message, metadata, options, function (err, response) { assert.ifError(err); assert.strictEqual(response.value, expectedValue); done(); }); }); it('with client streaming call', function (done) { var message = { value: originalValue }; var client_stream = client.echoClientStream(metadata, options, function (err, response) { assert.ifError(err); assert.strictEqual(response.value, expectedValue); done(); }); client_stream.write(message); client_stream.end(); }); it('with server streaming call', function (done) { var message = { value: originalValue }; var server_stream = client.echoServerStream(message, metadata, options); server_stream.on('data', function (data) { assert.strictEqual(data.value, expectedValue); done(); }); server_stream.on('error', (error) => { assert.ifError(error); }); }); it('with bidi streaming call', function (done) { var message = { value: originalValue }; var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('data', function (data) { assert.strictEqual(data.value, expectedValue); done(); }); bidi_stream.on('error', (error) => { assert.ifError(error); }); bidi_stream.write(message); bidi_stream.end(); }); }); describe('trigger when receiving status', function() { var expectedStatus = 'foo'; var options; var metadata = new Metadata(); before(function() { var foo_interceptor = function (options, nextCall) { var outbound = (new RequesterBuilder()) .withStart(function (metadata, listener, next) { var new_listener = (new ListenerBuilder()).withOnReceiveStatus( function (status, next) { assert.strictEqual(status.code, 2); assert.strictEqual(status.details, 'test status message'); var new_status = { code: 1, details: expectedStatus, metadata: {} }; next(new_status); }).build(); next(metadata, new_listener); }).build(); return new InterceptingCall(nextCall(options), outbound); }; options = { interceptors: [foo_interceptor] }; }); it('with unary call', function (done) { var message = { value: 'error' }; var unary_call = client.echo(message, metadata, options, function () { }); unary_call.on('status', function (status) { assert.strictEqual(status.code, 1); assert.strictEqual(status.details, expectedStatus); done(); }); }); it('with client streaming call', function (done) { var message = { value: 'error' }; var client_stream = client.echoClientStream(metadata, options, function () { }); client_stream.on('status', function (status) { assert.strictEqual(status.code, 1); assert.strictEqual(status.details, expectedStatus); done(); }); client_stream.write(message); client_stream.end(); }); it('with server streaming call', function(done) { var message = {value: 'error'}; var server_stream = client.echoServerStream(message, metadata, options); server_stream.on('error', function (err) { }); server_stream.on('data', function (data) { }); server_stream.on('status', function (status) { assert.strictEqual(status.code, 1); assert.strictEqual(status.details, expectedStatus); done(); }); }); it('with bidi streaming call', function(done) { var message = {value: 'error'}; var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('error', function(err) {}); bidi_stream.on('data', function(data) {}); bidi_stream.on('status', function(status) { assert.strictEqual(status.code, 1); assert.strictEqual(status.details, expectedStatus); done(); }); bidi_stream.write(message); bidi_stream.end(); }); }); describe('delay streaming headers', function() { var options; var metadata = new Metadata(); before(function() { var foo_interceptor = function (options, nextCall) { var startNext; var startListener; var startMetadata; var methods = { start: function (metadata, listener, next) { startNext = next; startListener = listener; startMetadata = metadata; }, sendMessage: function (message, next) { startMetadata.set('fromMessage', message.value); startNext(startMetadata, startListener); next(message); } }; return new InterceptingCall(nextCall(options), methods); }; options = { interceptors: [foo_interceptor] }; }); it('with client streaming call', function (done) { var message = { value: 'foo' }; var client_stream = client.echoClientStream(metadata, options, function (error, response) { assert.ifError(error); done(); }); client_stream.on('metadata', function (metadata) { assert.equal(metadata.get('fromMessage'), 'foo'); }); client_stream.write(message); client_stream.end(); }); it('with bidi streaming call', function (done) { var message = { value: 'foo' }; var bidi_stream = client.echoBidiStream(metadata, options); bidi_stream.on('metadata', function (metadata) { assert.equal(metadata.get('fromMessage'), 'foo'); }); bidi_stream.on('data', () => {}); bidi_stream.on('status', (status) => { assert.strictEqual(status.code, clientGrpc.status.OK); done(); }); bidi_stream.on('error', (error) => { assert.ifError(error); }); bidi_stream.write(message); bidi_stream.end(); }); }); }); });