/* * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ import * as assert from 'assert'; import * as grpc from '../src'; import { ServiceClient, ServiceClientConstructor } from '../src/make-client'; import { loadProtoFile } from './common'; function multiDone(done: () => void, target: number) { let count = 0; return () => { count++; if (count >= target) { done(); } } } describe('Call propagation', () => { let server: grpc.Server; let Client: ServiceClientConstructor; let client: ServiceClient; let proxyServer: grpc.Server; let proxyClient: ServiceClient; before((done) => { Client = loadProtoFile(__dirname + '/fixtures/test_service.proto').TestService as ServiceClientConstructor; server = new grpc.Server(); server.addService(Client.service, { unary: () => {}, clientStream: () => {}, serverStream: () => {}, bidiStream: () => {} }); proxyServer = new grpc.Server(); server.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => { if (error) { done(error); return; } server.start(); client = new Client(`localhost:${port}`, grpc.credentials.createInsecure()); proxyServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, proxyPort) => { if (error) { done(error); return; } proxyServer.start(); proxyClient = new Client(`localhost:${proxyPort}`, grpc.credentials.createInsecure()); done(); }); }); }); afterEach(() => { proxyServer.removeService(Client.service); }); after(() => { server.forceShutdown(); proxyServer.forceShutdown(); }); describe('Cancellation', () => { it('should work with unary requests', (done) => { done = multiDone(done, 2); let call: grpc.ClientUnaryCall; proxyServer.addService(Client.service, { unary: (parent: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { client.unary(parent.request, {parent: parent}, (error: grpc.ServiceError, value: unknown) => { callback(error, value); assert(error); assert.strictEqual(error.code, grpc.status.CANCELLED); done(); }); /* Cancel the original call after the server starts processing it to * ensure that it does reach the server. */ call.cancel(); } }); call = proxyClient.unary({}, (error: grpc.ServiceError, value: unknown) => { assert(error); assert.strictEqual(error.code, grpc.status.CANCELLED); done(); }); }); it('Should work with client streaming requests', (done) => { done = multiDone(done, 2); let call: grpc.ClientWritableStream; proxyServer.addService(Client.service, { clientStream: (parent: grpc.ServerReadableStream, callback: grpc.sendUnaryData) => { client.clientStream({parent: parent}, (error: grpc.ServiceError, value: unknown) => { callback(error, value); assert(error); assert.strictEqual(error.code, grpc.status.CANCELLED); done(); }); /* Cancel the original call after the server starts processing it to * ensure that it does reach the server. */ call.cancel(); } }); call = proxyClient.clientStream((error: grpc.ServiceError, value: unknown) => { assert(error); assert.strictEqual(error.code, grpc.status.CANCELLED); done(); }); }); it('Should work with server streaming requests', (done) => { done = multiDone(done, 2); let call: grpc.ClientReadableStream; proxyServer.addService(Client.service, { serverStream: (parent: grpc.ServerWritableStream) => { const child = client.serverStream(parent.request, {parent: parent}); child.on('error', () => {}); child.on('status', (status: grpc.StatusObject) => { assert.strictEqual(status.code, grpc.status.CANCELLED); done(); }); call.cancel(); } }); call = proxyClient.serverStream({}); call.on('error', () => {}); call.on('status', (status: grpc.StatusObject) => { assert.strictEqual(status.code, grpc.status.CANCELLED); done(); }); }); it('Should work with bidi streaming requests', (done) => { done = multiDone(done, 2); let call: grpc.ClientDuplexStream; proxyServer.addService(Client.service, { bidiStream: (parent: grpc.ServerDuplexStream) => { const child = client.bidiStream({parent: parent}); child.on('error', () => {}); child.on('status', (status: grpc.StatusObject) => { assert.strictEqual(status.code, grpc.status.CANCELLED); done(); }); call.cancel(); } }); call = proxyClient.bidiStream(); call.on('error', () => {}); call.on('status', (status: grpc.StatusObject) => { assert.strictEqual(status.code, grpc.status.CANCELLED); done(); }); }); }); describe('Deadlines', () => { it('should work with unary requests', (done) => { done = multiDone(done, 2); let call: grpc.ClientUnaryCall; proxyServer.addService(Client.service, { unary: (parent: grpc.ServerUnaryCall, callback: grpc.sendUnaryData) => { client.unary(parent.request, {parent: parent, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { callback(error, value); assert(error); assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); done(); }); } }); const deadline = new Date(); deadline.setMilliseconds(deadline.getMilliseconds() + 100); call = proxyClient.unary({}, {deadline}, (error: grpc.ServiceError, value: unknown) => { assert(error); assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); done(); }); }); it('Should work with client streaming requests', (done) => { done = multiDone(done, 2); let call: grpc.ClientWritableStream; proxyServer.addService(Client.service, { clientStream: (parent: grpc.ServerReadableStream, callback: grpc.sendUnaryData) => { client.clientStream({parent: parent, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { callback(error, value); assert(error); assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); done(); }); } }); const deadline = new Date(); deadline.setMilliseconds(deadline.getMilliseconds() + 100); call = proxyClient.clientStream({deadline, propagate_flags: grpc.propagate.DEADLINE}, (error: grpc.ServiceError, value: unknown) => { assert(error); assert.strictEqual(error.code, grpc.status.DEADLINE_EXCEEDED); done(); }); }); it('Should work with server streaming requests', (done) => { done = multiDone(done, 2); let call: grpc.ClientReadableStream; proxyServer.addService(Client.service, { serverStream: (parent: grpc.ServerWritableStream) => { const child = client.serverStream(parent.request, {parent: parent, propagate_flags: grpc.propagate.DEADLINE}); child.on('error', () => {}); child.on('status', (status: grpc.StatusObject) => { assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED); done(); }); } }); const deadline = new Date(); deadline.setMilliseconds(deadline.getMilliseconds() + 100); call = proxyClient.serverStream({}, {deadline}); call.on('error', () => {}); call.on('status', (status: grpc.StatusObject) => { assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED); done(); }); }); it('Should work with bidi streaming requests', (done) => { done = multiDone(done, 2); let call: grpc.ClientDuplexStream; proxyServer.addService(Client.service, { bidiStream: (parent: grpc.ServerDuplexStream) => { const child = client.bidiStream({parent: parent, propagate_flags: grpc.propagate.DEADLINE}); child.on('error', () => {}); child.on('status', (status: grpc.StatusObject) => { assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED); done(); }); } }); const deadline = new Date(); deadline.setMilliseconds(deadline.getMilliseconds() + 100); call = proxyClient.bidiStream({deadline}); call.on('error', () => {}); call.on('status', (status: grpc.StatusObject) => { assert.strictEqual(status.code, grpc.status.DEADLINE_EXCEEDED); done(); }); }); }); });