mirror of https://github.com/grpc/grpc-node.git
Add channelz tests and fix some bugs
This commit is contained in:
parent
acd9913687
commit
d60c4ea16f
|
@ -25,7 +25,7 @@ import { FilterStackFactory, FilterStack } from './filter-stack';
|
|||
import { Metadata } from './metadata';
|
||||
import { StreamDecoder } from './stream-decoder';
|
||||
import { ChannelImplementation } from './channel';
|
||||
import { Subchannel } from './subchannel';
|
||||
import { SubchannelCallStatsTracker, Subchannel } from './subchannel';
|
||||
import * as logging from './logging';
|
||||
import { LogVerbosity } from './constants';
|
||||
import { ServerSurfaceCall } from './server-call';
|
||||
|
@ -252,6 +252,8 @@ export class Http2CallStream implements Call {
|
|||
private statusWatchers: ((status: StatusObject) => void)[] = [];
|
||||
private streamEndWatchers: ((success: boolean) => void)[] = [];
|
||||
|
||||
private callStatsTracker: SubchannelCallStatsTracker | null = null;
|
||||
|
||||
constructor(
|
||||
private readonly methodName: string,
|
||||
private readonly channel: ChannelImplementation,
|
||||
|
@ -468,10 +470,16 @@ export class Http2CallStream implements Call {
|
|||
this.endCall(status);
|
||||
}
|
||||
|
||||
private writeMessageToStream(message: Buffer, callback: WriteCallback) {
|
||||
this.callStatsTracker?.addMessageSent();
|
||||
this.http2Stream!.write(message, callback);
|
||||
}
|
||||
|
||||
attachHttp2Stream(
|
||||
stream: http2.ClientHttp2Stream,
|
||||
subchannel: Subchannel,
|
||||
extraFilters: FilterFactory<Filter>[]
|
||||
extraFilters: FilterFactory<Filter>[],
|
||||
callStatsTracker: SubchannelCallStatsTracker
|
||||
): void {
|
||||
this.filterStack.push(
|
||||
extraFilters.map((filterFactory) => filterFactory.createFilter(this))
|
||||
|
@ -484,6 +492,7 @@ export class Http2CallStream implements Call {
|
|||
);
|
||||
this.http2Stream = stream;
|
||||
this.subchannel = subchannel;
|
||||
this.callStatsTracker = callStatsTracker;
|
||||
subchannel.addDisconnectListener(this.disconnectListener);
|
||||
subchannel.callRef();
|
||||
stream.on('response', (headers, flags) => {
|
||||
|
@ -549,6 +558,7 @@ export class Http2CallStream implements Call {
|
|||
|
||||
for (const message of messages) {
|
||||
this.trace('parsed message of length ' + message.length);
|
||||
this.callStatsTracker!.addMessageReceived();
|
||||
this.tryPush(message);
|
||||
}
|
||||
});
|
||||
|
@ -666,7 +676,7 @@ export class Http2CallStream implements Call {
|
|||
this.pendingWrite.length +
|
||||
' (deferred)'
|
||||
);
|
||||
stream.write(this.pendingWrite, this.pendingWriteCallback);
|
||||
this.writeMessageToStream(this.pendingWrite, this.pendingWriteCallback);
|
||||
}
|
||||
this.maybeCloseWrites();
|
||||
}
|
||||
|
@ -802,7 +812,7 @@ export class Http2CallStream implements Call {
|
|||
this.pendingWriteCallback = cb;
|
||||
} else {
|
||||
this.trace('sending data chunk of length ' + message.message.length);
|
||||
this.http2Stream.write(message.message, cb);
|
||||
this.writeMessageToStream(message.message, cb);
|
||||
this.maybeCloseWrites();
|
||||
}
|
||||
}, this.handleFilterError.bind(this));
|
||||
|
|
|
@ -103,6 +103,12 @@ export interface Channel {
|
|||
deadline: Date | number,
|
||||
callback: (error?: Error) => void
|
||||
): void;
|
||||
/**
|
||||
* Get the channelz reference object for this channel. A request to the
|
||||
* channelz service for the id in this object will provide information
|
||||
* about this channel.
|
||||
*/
|
||||
getChannelzRef(): ChannelRef;
|
||||
/**
|
||||
* Create a call object. Call is an opaque type that is used by the Client
|
||||
* class. This function is called by the gRPC library when starting a
|
||||
|
@ -243,7 +249,7 @@ export class ChannelImplementation implements Channel {
|
|||
Object.assign({}, this.options, subchannelArgs),
|
||||
this.credentials
|
||||
);
|
||||
this.channelzTrace.addTrace('CT_INFO', 'Created or got existing subchannel', subchannel.getChannelzRef());
|
||||
this.channelzTrace.addTrace('CT_INFO', 'Created subchannel or used existing subchannel', subchannel.getChannelzRef());
|
||||
return subchannel;
|
||||
},
|
||||
updateState: (connectivityState: ConnectivityState, picker: Picker) => {
|
||||
|
@ -677,6 +683,10 @@ export class ChannelImplementation implements Channel {
|
|||
this.connectivityStateWatchers.push(watcherObject);
|
||||
}
|
||||
|
||||
getChannelzRef() {
|
||||
return this.channelzRef;
|
||||
}
|
||||
|
||||
createCall(
|
||||
method: string,
|
||||
deadline: Deadline,
|
||||
|
|
|
@ -51,6 +51,7 @@ import { ChannelzDefinition, ChannelzHandlers } from "./generated/grpc/channelz/
|
|||
import { ProtoGrpcType as ChannelzProtoGrpcType } from "./generated/channelz";
|
||||
import type { loadSync } from '@grpc/proto-loader';
|
||||
import { registerAdminService } from "./admin";
|
||||
import { loadPackageDefinition } from "./make-client";
|
||||
|
||||
export type TraceSeverity = 'CT_UNKNOWN' | 'CT_INFO' | 'CT_WARNING' | 'CT_ERROR';
|
||||
|
||||
|
@ -455,9 +456,10 @@ function dateToProtoTimestamp(date?: Date | null): Timestamp | null {
|
|||
if (!date) {
|
||||
return null;
|
||||
}
|
||||
const millisSinceEpoch = date.getTime();
|
||||
return {
|
||||
seconds: date.getSeconds(),
|
||||
nanos: date.getMilliseconds() * 1_000_000
|
||||
seconds: (millisSinceEpoch / 1000) | 0,
|
||||
nanos: (millisSinceEpoch % 1000) * 1_000_000
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -664,7 +666,10 @@ function GetServerSockets(call: ServerUnaryCall<GetServerSocketsRequest__Output,
|
|||
const startId = Number.parseInt(call.request.start_socket_id);
|
||||
const maxResults = Number.parseInt(call.request.max_results);
|
||||
const resolvedInfo = serverEntry.getInfo();
|
||||
const allSockets = resolvedInfo.listenerChildren.sockets.concat(resolvedInfo.sessionChildren.sockets).sort((ref1, ref2) => ref1.id - ref2.id);
|
||||
// If we wanted to include listener sockets in the result, this line would
|
||||
// instead say
|
||||
// const allSockets = resolvedInfo.listenerChildren.sockets.concat(resolvedInfo.sessionChildren.sockets).sort((ref1, ref2) => ref1.id - ref2.id);
|
||||
const allSockets = resolvedInfo.sessionChildren.sockets.sort((ref1, ref2) => ref1.id - ref2.id);
|
||||
const resultList: SocketRefMessage[] = [];
|
||||
let i = 0;
|
||||
for (; i < allSockets.length; i++) {
|
||||
|
@ -709,10 +714,11 @@ export function getChannelzServiceDefinition(): ChannelzDefinition {
|
|||
defaults: true,
|
||||
oneofs: true,
|
||||
includeDirs: [
|
||||
'../../proto'
|
||||
`${__dirname}/../../proto`
|
||||
]
|
||||
}) as unknown as ChannelzProtoGrpcType;
|
||||
loadedChannelzDefinition = loadedProto.grpc.channelz.v1.Channelz.service;
|
||||
});
|
||||
const channelzGrpcObject = loadPackageDefinition(loadedProto) as unknown as ChannelzProtoGrpcType;
|
||||
loadedChannelzDefinition = channelzGrpcObject.grpc.channelz.v1.Channelz.service;
|
||||
return loadedChannelzDefinition;
|
||||
}
|
||||
|
||||
|
|
|
@ -570,7 +570,6 @@ export class Http2ServerCallStream<
|
|||
const response = this.serializeMessage(value!);
|
||||
|
||||
this.write(response);
|
||||
this.emit('sendMessage');
|
||||
this.sendStatus({ code: Status.OK, details: 'OK', metadata });
|
||||
} catch (err) {
|
||||
err.code = Status.INTERNAL;
|
||||
|
|
|
@ -188,7 +188,7 @@ export class Server {
|
|||
const peerCertificate = tlsSocket.getPeerCertificate();
|
||||
tlsInfo = {
|
||||
cipherSuiteStandardName: cipherInfo.standardName ?? null,
|
||||
cipherSuiteOtherName: cipherInfo.standardName ? cipherInfo.name: null,
|
||||
cipherSuiteOtherName: cipherInfo.standardName ? null : cipherInfo.name,
|
||||
localCertificate: (certificate && 'raw' in certificate) ? certificate.raw : null,
|
||||
remoteCertificate: (peerCertificate && 'raw' in peerCertificate) ? peerCertificate.raw : null
|
||||
};
|
||||
|
@ -424,6 +424,7 @@ export class Server {
|
|||
remoteFlowControlWindow: null
|
||||
};
|
||||
});
|
||||
this.listenerChildrenTracker.refChild(channelzRef);
|
||||
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
|
||||
trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
|
||||
resolve('port' in boundSubchannelAddress ? boundSubchannelAddress.port : portNum);
|
||||
|
@ -491,6 +492,7 @@ export class Server {
|
|||
remoteFlowControlWindow: null
|
||||
};
|
||||
});
|
||||
this.listenerChildrenTracker.refChild(channelzRef);
|
||||
this.http2ServerList.push({server: http2Server, channelzRef: channelzRef});
|
||||
trace('Successfully bound ' + subchannelAddressToString(boundSubchannelAddress));
|
||||
resolve(
|
||||
|
@ -676,6 +678,10 @@ export class Server {
|
|||
throw new Error('Not yet implemented');
|
||||
}
|
||||
|
||||
getChannelzRef() {
|
||||
return this.channelzRef;
|
||||
}
|
||||
|
||||
private _setupHandlers(
|
||||
http2Server: http2.Http2Server | http2.Http2SecureServer
|
||||
): void {
|
||||
|
|
|
@ -68,6 +68,11 @@ export type ConnectivityStateListener = (
|
|||
newState: ConnectivityState
|
||||
) => void;
|
||||
|
||||
export interface SubchannelCallStatsTracker {
|
||||
addMessageSent(): void;
|
||||
addMessageReceived(): void;
|
||||
}
|
||||
|
||||
const {
|
||||
HTTP2_HEADER_AUTHORITY,
|
||||
HTTP2_HEADER_CONTENT_TYPE,
|
||||
|
@ -178,33 +183,6 @@ export class Subchannel {
|
|||
private messagesReceived = 0;
|
||||
private lastMessageSentTimestamp: Date | null = null;
|
||||
private lastMessageReceivedTimestamp: Date | null = null;
|
||||
private MessageCountFilter = class extends BaseFilter implements Filter {
|
||||
private session: http2.ClientHttp2Session;
|
||||
constructor(private parent: Subchannel) {
|
||||
super();
|
||||
this.session = parent.session!;
|
||||
}
|
||||
sendMessage(message: Promise<WriteObject>): Promise<WriteObject> {
|
||||
if (this.parent.session === this.session) {
|
||||
this.parent.messagesSent += 1;
|
||||
this.parent.lastMessageSentTimestamp = new Date();
|
||||
}
|
||||
return message;
|
||||
}
|
||||
receiveMessage(message: Promise<Buffer>): Promise<Buffer> {
|
||||
if (this.parent.session === this.session) {
|
||||
this.parent.messagesReceived += 1;
|
||||
this.parent.lastMessageReceivedTimestamp = new Date();
|
||||
}
|
||||
return message;
|
||||
}
|
||||
};
|
||||
private MessageCountFilterFactory = class implements FilterFactory<Filter> {
|
||||
constructor(private parent: Subchannel) {}
|
||||
createFilter(callStream: Call): Filter {
|
||||
return new this.parent.MessageCountFilter(this.parent);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A class representing a connection to a single backend.
|
||||
|
@ -848,8 +826,15 @@ export class Subchannel {
|
|||
}
|
||||
}
|
||||
});
|
||||
extraFilterFactories.push(new this.MessageCountFilterFactory(this));
|
||||
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactories);
|
||||
callStream.attachHttp2Stream(http2Stream, this, extraFilterFactories, {
|
||||
addMessageSent: () => {
|
||||
this.messagesSent += 1;
|
||||
this.lastMessageSentTimestamp = new Date();
|
||||
},
|
||||
addMessageReceived: () => {
|
||||
this.messagesReceived += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,289 @@
|
|||
/*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
import * as assert from 'assert';
|
||||
import * as protoLoader from '@grpc/proto-loader';
|
||||
import * as grpc from '../src';
|
||||
|
||||
import { ProtoGrpcType } from '../src/generated/channelz'
|
||||
import { ChannelzClient } from '../src/generated/grpc/channelz/v1/Channelz';
|
||||
import { Channel__Output } from '../src/generated/grpc/channelz/v1/Channel';
|
||||
import { Server__Output } from '../src/generated/grpc/channelz/v1/Server';
|
||||
import { ServiceClient, ServiceClientConstructor } from '../src/make-client';
|
||||
import { loadProtoFile } from './common';
|
||||
|
||||
const loadedChannelzProto = protoLoader.loadSync('channelz.proto', {
|
||||
keepCase: true,
|
||||
longs: String,
|
||||
enums: String,
|
||||
defaults: true,
|
||||
oneofs: true,
|
||||
includeDirs: [
|
||||
`${__dirname}/../../proto`
|
||||
]
|
||||
});
|
||||
const channelzGrpcObject = grpc.loadPackageDefinition(loadedChannelzProto) as unknown as ProtoGrpcType;
|
||||
|
||||
const TestServiceClient = loadProtoFile(`${__dirname}/fixtures/test_service.proto`).TestService as ServiceClientConstructor;
|
||||
|
||||
const testServiceImpl: grpc.UntypedServiceImplementation = {
|
||||
unary(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
|
||||
if (call.request.error) {
|
||||
setTimeout(() => {
|
||||
callback({
|
||||
code: grpc.status.INVALID_ARGUMENT,
|
||||
details: call.request.message
|
||||
});
|
||||
}, call.request.errorAfter)
|
||||
} else {
|
||||
callback(null, {count: 1});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
describe('Channelz', () => {
|
||||
let channelzServer: grpc.Server;
|
||||
let channelzClient: ChannelzClient;
|
||||
let testServer: grpc.Server;
|
||||
let testClient: ServiceClient;
|
||||
|
||||
before((done) => {
|
||||
channelzServer = new grpc.Server();
|
||||
channelzServer.addService(grpc.getChannelzServiceDefinition(), grpc.getChannelzHandlers());
|
||||
channelzServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
|
||||
if (error) {
|
||||
done(error);
|
||||
return;
|
||||
}
|
||||
channelzServer.start();
|
||||
channelzClient = new channelzGrpcObject.grpc.channelz.v1.Channelz(`localhost:${port}`, grpc.credentials.createInsecure());
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
after(() => {
|
||||
channelzClient.close();
|
||||
channelzServer.forceShutdown();
|
||||
});
|
||||
|
||||
beforeEach((done) => {
|
||||
testServer = new grpc.Server();
|
||||
testServer.addService(TestServiceClient.service, testServiceImpl);
|
||||
testServer.bindAsync('localhost:0', grpc.ServerCredentials.createInsecure(), (error, port) => {
|
||||
if (error) {
|
||||
done(error);
|
||||
return;
|
||||
}
|
||||
testServer.start();
|
||||
testClient = new TestServiceClient(`localhost:${port}`, grpc.credentials.createInsecure());
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
testClient.close();
|
||||
testServer.forceShutdown();
|
||||
});
|
||||
|
||||
it('should see a newly created channel', (done) => {
|
||||
// Test that the specific test client channel info can be retrieved
|
||||
channelzClient.GetChannel({channel_id: testClient.getChannel().getChannelzRef().id}, (error, result) => {
|
||||
assert.ifError(error);
|
||||
assert(result);
|
||||
assert(result.channel);
|
||||
assert(result.channel.ref);
|
||||
assert.strictEqual(+result.channel.ref.channel_id, testClient.getChannel().getChannelzRef().id);
|
||||
// Test that the channel is in the list of top channels
|
||||
channelzClient.getTopChannels({start_channel_id: testClient.getChannel().getChannelzRef().id, max_results:1}, (error, result) => {
|
||||
assert.ifError(error);
|
||||
assert(result);
|
||||
assert.strictEqual(result.channel.length, 1);
|
||||
assert(result.channel[0].ref);
|
||||
assert.strictEqual(+result.channel[0].ref.channel_id, testClient.getChannel().getChannelzRef().id);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should see a newly created server', (done) => {
|
||||
// Test that the specific test server info can be retrieved
|
||||
channelzClient.getServer({server_id: testServer.getChannelzRef().id}, (error, result) => {
|
||||
assert.ifError(error);
|
||||
assert(result);
|
||||
assert(result.server);
|
||||
assert(result.server.ref);
|
||||
assert.strictEqual(+result.server.ref.server_id, testServer.getChannelzRef().id);
|
||||
// Test that the server is in the list of servers
|
||||
channelzClient.getServers({start_server_id: testServer.getChannelzRef().id, max_results: 1}, (error, result) => {
|
||||
assert.ifError(error);
|
||||
assert(result);
|
||||
assert.strictEqual(result.server.length, 1);
|
||||
assert(result.server[0].ref);
|
||||
assert.strictEqual(+result.server[0].ref.server_id, testServer.getChannelzRef().id);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should count successful calls', (done) => {
|
||||
testClient.unary({}, (error: grpc.ServiceError, value: unknown) => {
|
||||
assert.ifError(error);
|
||||
// Channel data tests
|
||||
channelzClient.GetChannel({channel_id: testClient.getChannel().getChannelzRef().id}, (error, channelResult) => {
|
||||
assert.ifError(error);
|
||||
assert(channelResult);
|
||||
assert(channelResult.channel);
|
||||
assert(channelResult.channel.ref);
|
||||
assert(channelResult.channel.data);
|
||||
assert.strictEqual(+channelResult.channel.data.calls_started, 1);
|
||||
assert.strictEqual(+channelResult.channel.data.calls_succeeded, 1);
|
||||
assert.strictEqual(+channelResult.channel.data.calls_failed, 0);
|
||||
assert.strictEqual(channelResult.channel.subchannel_ref.length, 1);
|
||||
channelzClient.getSubchannel({subchannel_id: channelResult.channel.subchannel_ref[0].subchannel_id}, (error, subchannelResult) => {
|
||||
assert.ifError(error);
|
||||
assert(subchannelResult);
|
||||
assert(subchannelResult.subchannel);
|
||||
assert(subchannelResult.subchannel.ref);
|
||||
assert(subchannelResult.subchannel.data);
|
||||
assert.strictEqual(subchannelResult.subchannel.ref.subchannel_id, channelResult.channel!.subchannel_ref[0].subchannel_id);
|
||||
assert.strictEqual(+subchannelResult.subchannel.data.calls_started, 1);
|
||||
assert.strictEqual(+subchannelResult.subchannel.data.calls_succeeded, 1);
|
||||
assert.strictEqual(+subchannelResult.subchannel.data.calls_failed, 0);
|
||||
assert.strictEqual(subchannelResult.subchannel.socket_ref.length, 1);
|
||||
channelzClient.getSocket({socket_id: subchannelResult.subchannel.socket_ref[0].socket_id}, (error, socketResult) => {
|
||||
assert.ifError(error);
|
||||
assert(socketResult);
|
||||
assert(socketResult.socket);
|
||||
assert(socketResult.socket.ref);
|
||||
assert(socketResult.socket.data);
|
||||
assert.strictEqual(socketResult.socket.ref.socket_id, subchannelResult.subchannel!.socket_ref[0].socket_id);
|
||||
assert.strictEqual(+socketResult.socket.data.streams_started, 1);
|
||||
assert.strictEqual(+socketResult.socket.data.streams_succeeded, 1);
|
||||
assert.strictEqual(+socketResult.socket.data.streams_failed, 0);
|
||||
assert.strictEqual(+socketResult.socket.data.messages_received, 1);
|
||||
assert.strictEqual(+socketResult.socket.data.messages_sent, 1);
|
||||
// Server data tests
|
||||
channelzClient.getServer({server_id: testServer.getChannelzRef().id}, (error, serverResult) => {
|
||||
assert.ifError(error);
|
||||
assert(serverResult);
|
||||
assert(serverResult.server);
|
||||
assert(serverResult.server.ref);
|
||||
assert(serverResult.server.data);
|
||||
assert.strictEqual(+serverResult.server.ref.server_id, testServer.getChannelzRef().id);
|
||||
assert.strictEqual(+serverResult.server.data.calls_started, 1);
|
||||
assert.strictEqual(+serverResult.server.data.calls_succeeded, 1);
|
||||
assert.strictEqual(+serverResult.server.data.calls_failed, 0);
|
||||
channelzClient.getServerSockets({server_id: testServer.getChannelzRef().id}, (error, socketsResult) => {
|
||||
assert.ifError(error);
|
||||
assert(socketsResult);
|
||||
assert.strictEqual(socketsResult.socket_ref.length, 1);
|
||||
channelzClient.getSocket({socket_id: socketsResult.socket_ref[0].socket_id}, (error, serverSocketResult) => {
|
||||
assert.ifError(error);
|
||||
assert(serverSocketResult);
|
||||
assert(serverSocketResult.socket);
|
||||
assert(serverSocketResult.socket.ref);
|
||||
assert(serverSocketResult.socket.data);
|
||||
assert.strictEqual(serverSocketResult.socket.ref.socket_id, socketsResult.socket_ref[0].socket_id);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.streams_started, 1);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.streams_succeeded, 1);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.streams_failed, 0);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.messages_received, 1);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.messages_sent, 1);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should count failed calls', (done) => {
|
||||
testClient.unary({error: true}, (error: grpc.ServiceError, value: unknown) => {
|
||||
assert(error);
|
||||
// Channel data tests
|
||||
channelzClient.GetChannel({channel_id: testClient.getChannel().getChannelzRef().id}, (error, channelResult) => {
|
||||
assert.ifError(error);
|
||||
assert(channelResult);
|
||||
assert(channelResult.channel);
|
||||
assert(channelResult.channel.ref);
|
||||
assert(channelResult.channel.data);
|
||||
assert.strictEqual(+channelResult.channel.data.calls_started, 1);
|
||||
assert.strictEqual(+channelResult.channel.data.calls_succeeded, 0);
|
||||
assert.strictEqual(+channelResult.channel.data.calls_failed, 1);
|
||||
assert.strictEqual(channelResult.channel.subchannel_ref.length, 1);
|
||||
channelzClient.getSubchannel({subchannel_id: channelResult.channel.subchannel_ref[0].subchannel_id}, (error, subchannelResult) => {
|
||||
assert.ifError(error);
|
||||
assert(subchannelResult);
|
||||
assert(subchannelResult.subchannel);
|
||||
assert(subchannelResult.subchannel.ref);
|
||||
assert(subchannelResult.subchannel.data);
|
||||
assert.strictEqual(subchannelResult.subchannel.ref.subchannel_id, channelResult.channel!.subchannel_ref[0].subchannel_id);
|
||||
assert.strictEqual(+subchannelResult.subchannel.data.calls_started, 1);
|
||||
assert.strictEqual(+subchannelResult.subchannel.data.calls_succeeded, 0);
|
||||
assert.strictEqual(+subchannelResult.subchannel.data.calls_failed, 1);
|
||||
assert.strictEqual(subchannelResult.subchannel.socket_ref.length, 1);
|
||||
channelzClient.getSocket({socket_id: subchannelResult.subchannel.socket_ref[0].socket_id}, (error, socketResult) => {
|
||||
assert.ifError(error);
|
||||
assert(socketResult);
|
||||
assert(socketResult.socket);
|
||||
assert(socketResult.socket.ref);
|
||||
assert(socketResult.socket.data);
|
||||
assert.strictEqual(socketResult.socket.ref.socket_id, subchannelResult.subchannel!.socket_ref[0].socket_id);
|
||||
assert.strictEqual(+socketResult.socket.data.streams_started, 1);
|
||||
assert.strictEqual(+socketResult.socket.data.streams_succeeded, 1);
|
||||
assert.strictEqual(+socketResult.socket.data.streams_failed, 0);
|
||||
assert.strictEqual(+socketResult.socket.data.messages_received, 0);
|
||||
assert.strictEqual(+socketResult.socket.data.messages_sent, 1);
|
||||
// Server data tests
|
||||
channelzClient.getServer({server_id: testServer.getChannelzRef().id}, (error, serverResult) => {
|
||||
assert.ifError(error);
|
||||
assert(serverResult);
|
||||
assert(serverResult.server);
|
||||
assert(serverResult.server.ref);
|
||||
assert(serverResult.server.data);
|
||||
assert.strictEqual(+serverResult.server.ref.server_id, testServer.getChannelzRef().id);
|
||||
assert.strictEqual(+serverResult.server.data.calls_started, 1);
|
||||
assert.strictEqual(+serverResult.server.data.calls_succeeded, 0);
|
||||
assert.strictEqual(+serverResult.server.data.calls_failed, 1);
|
||||
channelzClient.getServerSockets({server_id: testServer.getChannelzRef().id}, (error, socketsResult) => {
|
||||
assert.ifError(error);
|
||||
assert(socketsResult);
|
||||
assert.strictEqual(socketsResult.socket_ref.length, 1);
|
||||
channelzClient.getSocket({socket_id: socketsResult.socket_ref[0].socket_id}, (error, serverSocketResult) => {
|
||||
assert.ifError(error);
|
||||
assert(serverSocketResult);
|
||||
assert(serverSocketResult.socket);
|
||||
assert(serverSocketResult.socket.ref);
|
||||
assert(serverSocketResult.socket.data);
|
||||
assert.strictEqual(serverSocketResult.socket.ref.socket_id, socketsResult.socket_ref[0].socket_id);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.streams_started, 1);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.streams_succeeded, 0);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.streams_failed, 1);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.messages_received, 1);
|
||||
assert.strictEqual(+serverSocketResult.socket.data.messages_sent, 0);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2021 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
'use strict';
|
||||
|
||||
require('../fixtures/js_js');
|
||||
const interopClient = require('../interop/interop_client');
|
||||
const interopServer = require('../interop/interop_server');
|
||||
const serverGrpc = require('../any_grpc').server;
|
||||
|
||||
const hostOverride = 'foo.test.google.fr';
|
||||
|
||||
const testCases = [
|
||||
'empty_unary',
|
||||
'large_unary',
|
||||
'client_streaming',
|
||||
'server_streaming',
|
||||
'ping_pong',
|
||||
'empty_stream',
|
||||
'cancel_after_begin',
|
||||
'cancel_after_first_response',
|
||||
'timeout_on_sleeping_server',
|
||||
'custom_metadata',
|
||||
'status_code_and_message',
|
||||
'special_status_message',
|
||||
'unimplemented_service',
|
||||
'unimplemented_method'
|
||||
];
|
||||
|
||||
function getRandomTest() {
|
||||
return testCases[(Math.random() * testCases.length) | 0];
|
||||
}
|
||||
|
||||
let testCompleteCount = 0;
|
||||
|
||||
interopServer.getServer('0', true, (error, result) => {
|
||||
if (error) {
|
||||
throw error;
|
||||
}
|
||||
const channelzServer = new serverGrpc.Server();
|
||||
channelzServer.bindAsync('localhost:0', serverGrpc.ServerCredentials.createInsecure(), (error, port) => {
|
||||
if (error) {
|
||||
throw error;
|
||||
}
|
||||
console.log(`Serving channelz at port ${port}`);
|
||||
serverGrpc.addAdminServicesToServer(channelzServer);
|
||||
channelzServer.start();
|
||||
result.server.start();
|
||||
setInterval(() => {
|
||||
interopClient.runTest(`localhost:${result.port}`, hostOverride, getRandomTest(), true, true, () => {
|
||||
testCompleteCount += 1;
|
||||
if (testCompleteCount % 100 === 0) {
|
||||
console.log(`Completed ${testCompleteCount} tests`);
|
||||
}
|
||||
});
|
||||
}, 100);
|
||||
});
|
||||
})
|
|
@ -200,7 +200,7 @@ function handleHalfDuplex(call) {
|
|||
* Get a server object bound to the given port
|
||||
* @param {string} port Port to which to bind
|
||||
* @param {boolean} tls Indicates that the bound port should use TLS
|
||||
* @param {function(Error, {{server: Server, port: number}})} callback Callback
|
||||
* @param {function(Error, {server: Server, port: number})} callback Callback
|
||||
* to call with result or error
|
||||
* @param {object?} options Optional additional options to use when
|
||||
* constructing the server
|
||||
|
|
Loading…
Reference in New Issue