fix: perf issues in hot paths

1. no unused timers, wrap tracing calls to avoid stringifying
2. track graceful end of the call and avoid emitting 'cancelled' in such cases
3. remove validate calls in metadata on operations where it's not needed
4. refactor server session stream handlers into separate channelz enabled/disabled handlers
5. refactor message request logic - reduce amount of microtasks generated
6. improve sendStatus a little when there is no metadata involved
This commit is contained in:
AVVS 2022-10-19 14:48:11 -07:00
parent c84b4f9664
commit 2f124ad68b
No known key found for this signature in database
GPG Key ID: 2B890ABACCC3E369
5 changed files with 503 additions and 389 deletions

View File

@ -97,6 +97,10 @@ export interface StatusObject {
metadata: Metadata; metadata: Metadata;
} }
export type PartialStatusObject = Pick<StatusObject, 'code' | 'details'> & {
metadata: Metadata | null;
}
export const enum WriteFlags { export const enum WriteFlags {
BufferHint = 1, BufferHint = 1,
NoCompress = 2, NoCompress = 2,

View File

@ -25,102 +25,102 @@ export interface ChannelzClient extends grpc.Client {
/** /**
* Returns a single Channel, or else a NOT_FOUND code. * Returns a single Channel, or else a NOT_FOUND code.
*/ */
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall;
GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetChannelResponse__Output) => void): grpc.ClientUnaryCall; GetChannel(argument: _grpc_channelz_v1_GetChannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetChannelResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Returns a single Server, or else a NOT_FOUND code. * Returns a single Server, or else a NOT_FOUND code.
*/ */
GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; GetServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; GetServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
GetServer(argument: _grpc_channelz_v1_GetServerRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; GetServer(argument: _grpc_channelz_v1_GetServerRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Returns a single Server, or else a NOT_FOUND code. * Returns a single Server, or else a NOT_FOUND code.
*/ */
getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; getServer(argument: _grpc_channelz_v1_GetServerRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; getServer(argument: _grpc_channelz_v1_GetServerRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
getServer(argument: _grpc_channelz_v1_GetServerRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerResponse__Output) => void): grpc.ClientUnaryCall; getServer(argument: _grpc_channelz_v1_GetServerRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Gets all server sockets that exist in the process. * Gets all server sockets that exist in the process.
*/ */
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; GetServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Gets all server sockets that exist in the process. * Gets all server sockets that exist in the process.
*/ */
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServerSocketsResponse__Output) => void): grpc.ClientUnaryCall; getServerSockets(argument: _grpc_channelz_v1_GetServerSocketsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServerSocketsResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Gets all servers that exist in the process. * Gets all servers that exist in the process.
*/ */
GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; GetServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; GetServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
GetServers(argument: _grpc_channelz_v1_GetServersRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; GetServers(argument: _grpc_channelz_v1_GetServersRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Gets all servers that exist in the process. * Gets all servers that exist in the process.
*/ */
getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; getServers(argument: _grpc_channelz_v1_GetServersRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; getServers(argument: _grpc_channelz_v1_GetServersRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
getServers(argument: _grpc_channelz_v1_GetServersRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetServersResponse__Output) => void): grpc.ClientUnaryCall; getServers(argument: _grpc_channelz_v1_GetServersRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetServersResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Returns a single Socket or else a NOT_FOUND code. * Returns a single Socket or else a NOT_FOUND code.
*/ */
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; GetSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Returns a single Socket or else a NOT_FOUND code. * Returns a single Socket or else a NOT_FOUND code.
*/ */
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; getSocket(argument: _grpc_channelz_v1_GetSocketRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; getSocket(argument: _grpc_channelz_v1_GetSocketRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
getSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSocketResponse__Output) => void): grpc.ClientUnaryCall; getSocket(argument: _grpc_channelz_v1_GetSocketRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSocketResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Returns a single Subchannel, or else a NOT_FOUND code. * Returns a single Subchannel, or else a NOT_FOUND code.
*/ */
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; GetSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Returns a single Subchannel, or else a NOT_FOUND code. * Returns a single Subchannel, or else a NOT_FOUND code.
*/ */
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetSubchannelResponse__Output) => void): grpc.ClientUnaryCall; getSubchannel(argument: _grpc_channelz_v1_GetSubchannelRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetSubchannelResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Gets all root channels (i.e. channels the application has directly * Gets all root channels (i.e. channels the application has directly
* created). This does not include subchannels nor non-top level channels. * created). This does not include subchannels nor non-top level channels.
*/ */
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; GetTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
/** /**
* Gets all root channels (i.e. channels the application has directly * Gets all root channels (i.e. channels the application has directly
* created). This does not include subchannels nor non-top level channels. * created). This does not include subchannels nor non-top level channels.
*/ */
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: (error?: grpc.ServiceError, result?: _grpc_channelz_v1_GetTopChannelsResponse__Output) => void): grpc.ClientUnaryCall; getTopChannels(argument: _grpc_channelz_v1_GetTopChannelsRequest, callback: grpc.requestCallback<_grpc_channelz_v1_GetTopChannelsResponse__Output>): grpc.ClientUnaryCall;
} }

View File

@ -48,13 +48,14 @@ function validate(key: string, value?: MetadataValue): void {
if (!isLegalKey(key)) { if (!isLegalKey(key)) {
throw new Error('Metadata key "' + key + '" contains illegal characters'); throw new Error('Metadata key "' + key + '" contains illegal characters');
} }
if (value !== null && value !== undefined) { if (value !== null && value !== undefined) {
if (isBinaryKey(key)) { if (isBinaryKey(key)) {
if (!(value instanceof Buffer)) { if (!Buffer.isBuffer(value)) {
throw new Error("keys that end with '-bin' must have Buffer values"); throw new Error("keys that end with '-bin' must have Buffer values");
} }
} else { } else {
if (value instanceof Buffer) { if (Buffer.isBuffer(value)) {
throw new Error( throw new Error(
"keys that don't end with '-bin' must have String values" "keys that don't end with '-bin' must have String values"
); );
@ -88,13 +89,9 @@ export class Metadata {
protected internalRepr: MetadataObject = new Map<string, MetadataValue[]>(); protected internalRepr: MetadataObject = new Map<string, MetadataValue[]>();
private options: MetadataOptions; private options: MetadataOptions;
constructor(options?: MetadataOptions) { constructor(options: MetadataOptions = {}) {
if (options === undefined) {
this.options = {};
} else {
this.options = options; this.options = options;
} }
}
/** /**
* Sets the given value for the given key by replacing any other values * Sets the given value for the given key by replacing any other values
@ -120,9 +117,7 @@ export class Metadata {
key = normalizeKey(key); key = normalizeKey(key);
validate(key, value); validate(key, value);
const existingValue: MetadataValue[] | undefined = this.internalRepr.get( const existingValue: MetadataValue[] | undefined = this.internalRepr.get(key);
key
);
if (existingValue === undefined) { if (existingValue === undefined) {
this.internalRepr.set(key, [value]); this.internalRepr.set(key, [value]);
@ -137,7 +132,7 @@ export class Metadata {
*/ */
remove(key: string): void { remove(key: string): void {
key = normalizeKey(key); key = normalizeKey(key);
validate(key); // validate(key);
this.internalRepr.delete(key); this.internalRepr.delete(key);
} }
@ -148,7 +143,7 @@ export class Metadata {
*/ */
get(key: string): MetadataValue[] { get(key: string): MetadataValue[] {
key = normalizeKey(key); key = normalizeKey(key);
validate(key); // validate(key);
return this.internalRepr.get(key) || []; return this.internalRepr.get(key) || [];
} }
@ -160,12 +155,12 @@ export class Metadata {
getMap(): { [key: string]: MetadataValue } { getMap(): { [key: string]: MetadataValue } {
const result: { [key: string]: MetadataValue } = {}; const result: { [key: string]: MetadataValue } = {};
this.internalRepr.forEach((values, key) => { for (const [key, values] of this.internalRepr) {
if (values.length > 0) { if (values.length > 0) {
const v = values[0]; const v = values[0];
result[key] = v instanceof Buffer ? v.slice() : v; result[key] = Buffer.isBuffer(v) ? Buffer.from(v) : v;
}
} }
});
return result; return result;
} }
@ -177,9 +172,9 @@ export class Metadata {
const newMetadata = new Metadata(this.options); const newMetadata = new Metadata(this.options);
const newInternalRepr = newMetadata.internalRepr; const newInternalRepr = newMetadata.internalRepr;
this.internalRepr.forEach((value, key) => { for (const [key, value] of this.internalRepr) {
const clonedValue: MetadataValue[] = value.map((v) => { const clonedValue: MetadataValue[] = value.map((v) => {
if (v instanceof Buffer) { if (Buffer.isBuffer(v)) {
return Buffer.from(v); return Buffer.from(v);
} else { } else {
return v; return v;
@ -187,7 +182,7 @@ export class Metadata {
}); });
newInternalRepr.set(key, clonedValue); newInternalRepr.set(key, clonedValue);
}); }
return newMetadata; return newMetadata;
} }
@ -200,13 +195,13 @@ export class Metadata {
* @param other A Metadata object. * @param other A Metadata object.
*/ */
merge(other: Metadata): void { merge(other: Metadata): void {
other.internalRepr.forEach((values, key) => { for (const [key, values] of other.internalRepr) {
const mergedValue: MetadataValue[] = ( const mergedValue: MetadataValue[] = (
this.internalRepr.get(key) || [] this.internalRepr.get(key) || []
).concat(values); ).concat(values);
this.internalRepr.set(key, mergedValue); this.internalRepr.set(key, mergedValue);
}); }
} }
setOptions(options: MetadataOptions) { setOptions(options: MetadataOptions) {
@ -223,17 +218,13 @@ export class Metadata {
toHttp2Headers(): http2.OutgoingHttpHeaders { toHttp2Headers(): http2.OutgoingHttpHeaders {
// NOTE: Node <8.9 formats http2 headers incorrectly. // NOTE: Node <8.9 formats http2 headers incorrectly.
const result: http2.OutgoingHttpHeaders = {}; const result: http2.OutgoingHttpHeaders = {};
this.internalRepr.forEach((values, key) => {
for (const [key, values] of this.internalRepr) {
// We assume that the user's interaction with this object is limited to // We assume that the user's interaction with this object is limited to
// through its public API (i.e. keys and values are already validated). // through its public API (i.e. keys and values are already validated).
result[key] = values.map((value) => { result[key] = values.map(bufToString);
if (value instanceof Buffer) {
return value.toString('base64');
} else {
return value;
} }
});
});
return result; return result;
} }
@ -248,7 +239,7 @@ export class Metadata {
*/ */
toJSON() { toJSON() {
const result: { [key: string]: MetadataValue[] } = {}; const result: { [key: string]: MetadataValue[] } = {};
for (const [key, values] of this.internalRepr.entries()) { for (const [key, values] of this.internalRepr) {
result[key] = values; result[key] = values;
} }
return result; return result;
@ -261,10 +252,10 @@ export class Metadata {
*/ */
static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata { static fromHttp2Headers(headers: http2.IncomingHttpHeaders): Metadata {
const result = new Metadata(); const result = new Metadata();
Object.keys(headers).forEach((key) => { for (const key of Object.keys(headers)) {
// Reserved headers (beginning with `:`) are not valid keys. // Reserved headers (beginning with `:`) are not valid keys.
if (key.charAt(0) === ':') { if (key.charAt(0) === ':') {
return; continue;
} }
const values = headers[key]; const values = headers[key];
@ -297,7 +288,12 @@ export class Metadata {
const message = `Failed to add metadata entry ${key}: ${values}. ${error.message}. For more information see https://github.com/grpc/grpc-node/issues/1173`; const message = `Failed to add metadata entry ${key}: ${values}. ${error.message}. For more information see https://github.com/grpc/grpc-node/issues/1173`;
log(LogVerbosity.ERROR, message); log(LogVerbosity.ERROR, message);
} }
}); }
return result; return result;
} }
} }
const bufToString = (val: string | Buffer): string => {
return Buffer.isBuffer(val) ? val.toString('base64') : val
};

View File

@ -19,8 +19,9 @@ import { EventEmitter } from 'events';
import * as http2 from 'http2'; import * as http2 from 'http2';
import { Duplex, Readable, Writable } from 'stream'; import { Duplex, Readable, Writable } from 'stream';
import * as zlib from 'zlib'; import * as zlib from 'zlib';
import { promisify } from 'util';
import { Deadline, StatusObject } from './call-stream'; import { Deadline, StatusObject, PartialStatusObject } from './call-stream';
import { import {
Status, Status,
DEFAULT_MAX_SEND_MESSAGE_LENGTH, DEFAULT_MAX_SEND_MESSAGE_LENGTH,
@ -35,6 +36,8 @@ import { ChannelOptions } from './channel-options';
import * as logging from './logging'; import * as logging from './logging';
const TRACER_NAME = 'server_call'; const TRACER_NAME = 'server_call';
const unzip = promisify(zlib.unzip);
const inflate = promisify(zlib.inflate);
function trace(text: string): void { function trace(text: string): void {
logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text); logging.trace(LogVerbosity.DEBUG, TRACER_NAME, text);
@ -86,14 +89,10 @@ export type ServerSurfaceCall = {
export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & { export type ServerUnaryCall<RequestType, ResponseType> = ServerSurfaceCall & {
request: RequestType; request: RequestType;
}; };
export type ServerReadableStream< export type ServerReadableStream<RequestType, ResponseType> =
RequestType, ServerSurfaceCall & ObjectReadable<RequestType>;
ResponseType export type ServerWritableStream<RequestType, ResponseType> =
> = ServerSurfaceCall & ObjectReadable<RequestType>; ServerSurfaceCall &
export type ServerWritableStream<
RequestType,
ResponseType
> = ServerSurfaceCall &
ObjectWritable<ResponseType> & { ObjectWritable<ResponseType> & {
request: RequestType; request: RequestType;
end: (metadata?: Metadata) => void; end: (metadata?: Metadata) => void;
@ -104,7 +103,8 @@ export type ServerDuplexStream<RequestType, ResponseType> = ServerSurfaceCall &
export class ServerUnaryCallImpl<RequestType, ResponseType> export class ServerUnaryCallImpl<RequestType, ResponseType>
extends EventEmitter extends EventEmitter
implements ServerUnaryCall<RequestType, ResponseType> { implements ServerUnaryCall<RequestType, ResponseType>
{
cancelled: boolean; cancelled: boolean;
constructor( constructor(
@ -136,7 +136,8 @@ export class ServerUnaryCallImpl<RequestType, ResponseType>
export class ServerReadableStreamImpl<RequestType, ResponseType> export class ServerReadableStreamImpl<RequestType, ResponseType>
extends Readable extends Readable
implements ServerReadableStream<RequestType, ResponseType> { implements ServerReadableStream<RequestType, ResponseType>
{
cancelled: boolean; cancelled: boolean;
constructor( constructor(
@ -178,7 +179,8 @@ export class ServerReadableStreamImpl<RequestType, ResponseType>
export class ServerWritableStreamImpl<RequestType, ResponseType> export class ServerWritableStreamImpl<RequestType, ResponseType>
extends Writable extends Writable
implements ServerWritableStream<RequestType, ResponseType> { implements ServerWritableStream<RequestType, ResponseType>
{
cancelled: boolean; cancelled: boolean;
private trailingMetadata: Metadata; private trailingMetadata: Metadata;
@ -257,7 +259,8 @@ export class ServerWritableStreamImpl<RequestType, ResponseType>
export class ServerDuplexStreamImpl<RequestType, ResponseType> export class ServerDuplexStreamImpl<RequestType, ResponseType>
extends Duplex extends Duplex
implements ServerDuplexStream<RequestType, ResponseType> { implements ServerDuplexStream<RequestType, ResponseType>
{
cancelled: boolean; cancelled: boolean;
private trailingMetadata: Metadata; private trailingMetadata: Metadata;
@ -395,7 +398,8 @@ export class Http2ServerCallStream<
ResponseType ResponseType
> extends EventEmitter { > extends EventEmitter {
cancelled = false; cancelled = false;
deadlineTimer: NodeJS.Timer = setTimeout(() => {}, 0); deadlineTimer: NodeJS.Timer | null = null;
private statusSent = false;
private deadline: Deadline = Infinity; private deadline: Deadline = Infinity;
private wantTrailers = false; private wantTrailers = false;
private metadataSent = false; private metadataSent = false;
@ -428,10 +432,20 @@ export class Http2ServerCallStream<
' stream closed with rstCode ' + ' stream closed with rstCode ' +
this.stream.rstCode this.stream.rstCode
); );
if (!this.statusSent) {
this.cancelled = true; this.cancelled = true;
this.emit('cancelled', 'cancelled'); this.emit('cancelled', 'cancelled');
this.emit('streamEnd', false); this.emit('streamEnd', false);
this.sendStatus({code: Status.CANCELLED, details: 'Cancelled by client', metadata: new Metadata()}); this.sendStatus({
code: Status.CANCELLED,
details: 'Cancelled by client',
metadata: null,
});
}
// to compensate for a fact that cancelled is not always called
this.emit('close');
}); });
this.stream.on('drain', () => { this.stream.on('drain', () => {
@ -444,9 +458,6 @@ export class Http2ServerCallStream<
if ('grpc.max_receive_message_length' in options) { if ('grpc.max_receive_message_length' in options) {
this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!; this.maxReceiveMessageSize = options['grpc.max_receive_message_length']!;
} }
// Clear noop timer
clearTimeout(this.deadlineTimer);
} }
private checkCancelled(): boolean { private checkCancelled(): boolean {
@ -458,52 +469,22 @@ export class Http2ServerCallStream<
return this.cancelled; return this.cancelled;
} }
private getDecompressedMessage(message: Buffer, encoding: string) { private getDecompressedMessage(
switch (encoding) { message: Buffer,
case 'deflate': { encoding: string
return new Promise<Buffer | undefined>((resolve, reject) => { ): Buffer | Promise<Buffer> {
zlib.inflate(message.slice(5), (err, output) => { if (encoding === 'deflate') {
if (err) { return inflate(message.subarray(5));
this.sendError({ } else if (encoding === 'gzip') {
code: Status.INTERNAL, return unzip(message.subarray(5));
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`, } else if (encoding === 'identity') {
}); return message.subarray(5);
resolve();
} else {
resolve(output);
}
});
});
} }
case 'gzip': { return Promise.reject({
return new Promise<Buffer | undefined>((resolve, reject) => {
zlib.unzip(message.slice(5), (err, output) => {
if (err) {
this.sendError({
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
});
resolve();
} else {
resolve(output);
}
});
});
}
case 'identity': {
return Promise.resolve(message.slice(5));
}
default: {
this.sendError({
code: Status.UNIMPLEMENTED, code: Status.UNIMPLEMENTED,
details: `Received message compressed with unsupported encoding "${encoding}"`, details: `Received message compressed with unsupported encoding "${encoding}"`,
}); });
return Promise.resolve();
}
}
} }
sendMetadata(customMetadata?: Metadata) { sendMetadata(customMetadata?: Metadata) {
@ -518,13 +499,22 @@ export class Http2ServerCallStream<
this.metadataSent = true; this.metadataSent = true;
const custom = customMetadata ? customMetadata.toHttp2Headers() : null; const custom = customMetadata ? customMetadata.toHttp2Headers() : null;
// TODO(cjihrig): Include compression headers. // TODO(cjihrig): Include compression headers.
const headers = Object.assign({}, defaultResponseHeaders, custom); const headers = { ...defaultResponseHeaders, ...custom };
this.stream.respond(headers, defaultResponseOptions); this.stream.respond(headers, defaultResponseOptions);
} }
receiveMetadata(headers: http2.IncomingHttpHeaders) { receiveMetadata(headers: http2.IncomingHttpHeaders) {
const metadata = Metadata.fromHttp2Headers(headers); const metadata = Metadata.fromHttp2Headers(headers);
if (logging.isTracerEnabled(TRACER_NAME)) {
trace(
'Request to ' +
this.handler.path +
' received headers ' +
JSON.stringify(metadata.toJSON())
);
}
// TODO(cjihrig): Receive compression metadata. // TODO(cjihrig): Receive compression metadata.
const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER); const timeoutHeader = metadata.get(GRPC_TIMEOUT_HEADER);
@ -556,52 +546,95 @@ export class Http2ServerCallStream<
return metadata; return metadata;
} }
receiveUnaryMessage(encoding: string): Promise<RequestType> { receiveUnaryMessage(
return new Promise((resolve, reject) => { encoding: string,
const stream = this.stream; next: (
const chunks: Buffer[] = []; err: Partial<ServerStatusResponse> | null,
let totalLength = 0; request?: RequestType
) => void
): void {
const { stream } = this;
stream.on('data', (data: Buffer) => { let receivedLength = 0;
chunks.push(data); const call = this;
totalLength += data.byteLength; const body: Buffer[] = [];
}); const limit = this.maxReceiveMessageSize;
stream.once('end', async () => { stream.on('data', onData);
try { stream.on('end', onEnd);
const requestBytes = Buffer.concat(chunks, totalLength); stream.on('error', onEnd);
if (
this.maxReceiveMessageSize !== -1 && function onData(chunk: Buffer) {
requestBytes.length > this.maxReceiveMessageSize receivedLength += chunk.byteLength;
) {
this.sendError({ if (limit !== -1 && receivedLength > limit) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
next({
code: Status.RESOURCE_EXHAUSTED, code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${requestBytes.length} vs. ${this.maxReceiveMessageSize})`, details: `Received message larger than max (${receivedLength} vs. ${limit})`,
}); });
resolve(); return;
} }
this.emit('receiveMessage'); body.push(chunk);
}
function onEnd(err?: Error) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
if (err !== undefined) {
next({ code: Status.INTERNAL, details: err.message });
return;
}
if (receivedLength === 0) {
next({ code: Status.INTERNAL, details: 'received empty unary message' })
return;
}
call.emit('receiveMessage');
const requestBytes = Buffer.concat(body, receivedLength);
const compressed = requestBytes.readUInt8(0) === 1; const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity'; const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(requestBytes, compressedMessageEncoding); const decompressedMessage = call.getDecompressedMessage(
requestBytes,
compressedMessageEncoding
);
// Encountered an error with decompression; it'll already have been propogated back if (Buffer.isBuffer(decompressedMessage)) {
// Just return early call.safeDeserializeMessage(decompressedMessage, next);
if (!decompressedMessage) { return;
resolve();
} }
else {
resolve(this.deserializeMessage(decompressedMessage)); decompressedMessage.then(
(decompressed) => call.safeDeserializeMessage(decompressed, next),
(err: any) => next(
err.code
? err
: {
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
} }
)
)
}
}
private safeDeserializeMessage(
buffer: Buffer,
next: (err: Partial<ServerStatusResponse> | null, request?: RequestType) => void
) {
try {
next(null, this.deserializeMessage(buffer));
} catch (err) { } catch (err) {
err.code = Status.INTERNAL; err.code = Status.INTERNAL;
this.sendError(err); next(err);
resolve();
} }
});
});
} }
serializeMessage(value: ResponseType) { serializeMessage(value: ResponseType) {
@ -623,18 +656,19 @@ export class Http2ServerCallStream<
async sendUnaryMessage( async sendUnaryMessage(
err: ServerErrorResponse | ServerStatusResponse | null, err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null, value?: ResponseType | null,
metadata?: Metadata, metadata?: Metadata | null,
flags?: number flags?: number
) { ) {
if (this.checkCancelled()) { if (this.checkCancelled()) {
return; return;
} }
if (!metadata) {
metadata = new Metadata(); if (metadata === undefined) {
metadata = null;
} }
if (err) { if (err) {
if (!Object.prototype.hasOwnProperty.call(err, 'metadata')) { if (!Object.prototype.hasOwnProperty.call(err, 'metadata') && metadata) {
err.metadata = metadata; err.metadata = metadata;
} }
this.sendError(err); this.sendError(err);
@ -652,7 +686,7 @@ export class Http2ServerCallStream<
} }
} }
sendStatus(statusObj: StatusObject) { sendStatus(statusObj: PartialStatusObject) {
this.emit('callEnd', statusObj.code); this.emit('callEnd', statusObj.code);
this.emit('streamEnd', statusObj.code === Status.OK); this.emit('streamEnd', statusObj.code === Status.OK);
if (this.checkCancelled()) { if (this.checkCancelled()) {
@ -668,20 +702,19 @@ export class Http2ServerCallStream<
statusObj.details statusObj.details
); );
clearTimeout(this.deadlineTimer); if (this.deadlineTimer) clearTimeout(this.deadlineTimer);
if (!this.wantTrailers) { if (!this.wantTrailers) {
this.wantTrailers = true; this.wantTrailers = true;
this.stream.once('wantTrailers', () => { this.stream.once('wantTrailers', () => {
const trailersToSend = Object.assign( const trailersToSend = {
{
[GRPC_STATUS_HEADER]: statusObj.code, [GRPC_STATUS_HEADER]: statusObj.code,
[GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details as string), [GRPC_MESSAGE_HEADER]: encodeURI(statusObj.details),
}, ...statusObj.metadata?.toHttp2Headers(),
statusObj.metadata.toHttp2Headers() };
);
this.stream.sendTrailers(trailersToSend); this.stream.sendTrailers(trailersToSend);
this.statusSent = true;
}); });
this.sendMetadata(); this.sendMetadata();
this.stream.end(); this.stream.end();
@ -689,13 +722,13 @@ export class Http2ServerCallStream<
} }
sendError(error: ServerErrorResponse | ServerStatusResponse) { sendError(error: ServerErrorResponse | ServerStatusResponse) {
const status: StatusObject = { const status: PartialStatusObject = {
code: Status.UNKNOWN, code: Status.UNKNOWN,
details: 'message' in error ? error.message : 'Unknown Error', details: 'message' in error ? error.message : 'Unknown Error',
metadata: metadata:
'metadata' in error && error.metadata !== undefined 'metadata' in error && error.metadata !== undefined
? error.metadata ? error.metadata
: new Metadata(), : null,
}; };
if ( if (
@ -744,6 +777,9 @@ export class Http2ServerCallStream<
call.emit('cancelled', reason); call.emit('cancelled', reason);
}); });
// to compensate for the fact that cancelled is no longer always called
this.once('close', () => call.emit('close'))
this.once('callEnd', (status) => call.emit('callEnd', status)); this.once('callEnd', (status) => call.emit('callEnd', status));
} }
@ -766,7 +802,7 @@ export class Http2ServerCallStream<
pushedEnd = true; pushedEnd = true;
this.pushOrBufferMessage(readable, null); this.pushOrBufferMessage(readable, null);
} }
} };
this.stream.on('data', async (data: Buffer) => { this.stream.on('data', async (data: Buffer) => {
const messages = decoder.write(data); const messages = decoder.write(data);
@ -788,7 +824,10 @@ export class Http2ServerCallStream<
const compressed = message.readUInt8(0) === 1; const compressed = message.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity'; const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = await this.getDecompressedMessage(message, compressedMessageEncoding); const decompressedMessage = await this.getDecompressedMessage(
message,
compressedMessageEncoding
);
// Encountered an error with decompression; it'll already have been propogated back // Encountered an error with decompression; it'll already have been propogated back
// Just return early // Just return early

View File

@ -62,6 +62,10 @@ import { parseUri } from './uri-parser';
import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz'; import { ChannelzCallTracker, ChannelzChildrenTracker, ChannelzTrace, registerChannelzServer, registerChannelzSocket, ServerInfo, ServerRef, SocketInfo, SocketRef, TlsInfo, unregisterChannelzRef } from './channelz';
import { CipherNameAndProtocol, TLSSocket } from 'tls'; import { CipherNameAndProtocol, TLSSocket } from 'tls';
const {
HTTP2_HEADER_PATH
} = http2.constants
const TRACER_NAME = 'server'; const TRACER_NAME = 'server';
interface BindResult { interface BindResult {
@ -77,7 +81,6 @@ function getUnimplementedStatusResponse(
return { return {
code: Status.UNIMPLEMENTED, code: Status.UNIMPLEMENTED,
details: `The server does not implement the method ${methodName}`, details: `The server does not implement the method ${methodName}`,
metadata: new Metadata(),
}; };
} }
@ -147,6 +150,7 @@ export class Server {
private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>(); private sessions = new Map<http2.ServerHttp2Session, ChannelzSessionInfo>();
private started = false; private started = false;
private options: ChannelOptions; private options: ChannelOptions;
private serverAddressString: string = 'null'
// Channelz Info // Channelz Info
private readonly channelzEnabled: boolean = true; private readonly channelzEnabled: boolean = true;
@ -165,6 +169,7 @@ export class Server {
if (this.channelzEnabled) { if (this.channelzEnabled) {
this.channelzTrace.addTrace('CT_INFO', 'Server created'); this.channelzTrace.addTrace('CT_INFO', 'Server created');
} }
this.trace('Server constructed'); this.trace('Server constructed');
} }
@ -730,21 +735,7 @@ export class Server {
return this.channelzRef; return this.channelzRef;
} }
private _setupHandlers( private _verifyContentType(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders): boolean {
http2Server: http2.Http2Server | http2.Http2SecureServer
): void {
if (http2Server === null) {
return;
}
http2Server.on(
'stream',
(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) => {
const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
if (this.channelzEnabled) {
this.callTracker.addCallStarted();
channelzSessionInfo?.streamTracker.addCallStarted();
}
const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE];
if ( if (
@ -758,33 +749,22 @@ export class Server {
}, },
{ endStream: true } { endStream: true }
); );
this.callTracker.addCallFailed(); return false
if (this.channelzEnabled) {
channelzSessionInfo?.streamTracker.addCallFailed();
}
return;
} }
let call: Http2ServerCallStream<any, any> | null = null; return true
}
private _retrieveHandler(headers: http2.IncomingHttpHeaders): Handler<any, any> {
const path = headers[HTTP2_HEADER_PATH] as string
try {
const path = headers[http2.constants.HTTP2_HEADER_PATH] as string;
const serverAddress = http2Server.address();
let serverAddressString = 'null';
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress;
} else {
serverAddressString =
serverAddress.address + ':' + serverAddress.port;
}
}
this.trace( this.trace(
'Received call to method ' + 'Received call to method ' +
path + path +
' at address ' + ' at address ' +
serverAddressString this.serverAddressString
); );
const handler = this.handlers.get(path); const handler = this.handlers.get(path);
if (handler === undefined) { if (handler === undefined) {
@ -796,7 +776,50 @@ export class Server {
throw getUnimplementedStatusResponse(path); throw getUnimplementedStatusResponse(path);
} }
call = new Http2ServerCallStream(stream, handler, this.options); return handler
}
private _respondWithError<T extends Partial<ServiceError>>(
err: T,
stream: http2.ServerHttp2Stream,
channelzSessionInfo: ChannelzSessionInfo | null = null
) {
const call = new Http2ServerCallStream(stream, null!, this.options);
if (err.code === undefined) {
err.code = Status.INTERNAL;
}
if (this.channelzEnabled) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed()
}
call.sendError(err);
}
private _channelzHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) {
const channelzSessionInfo = this.sessions.get(stream.session as http2.ServerHttp2Session);
this.callTracker.addCallStarted();
channelzSessionInfo?.streamTracker.addCallStarted();
if (!this._verifyContentType(stream, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed();
return
}
let handler: Handler<any, any>
try {
handler = this._retrieveHandler(headers)
} catch (err) {
this._respondWithError(err, stream, channelzSessionInfo)
return
}
const call = new Http2ServerCallStream(stream, handler, this.options);
call.once('callEnd', (code: Status) => { call.once('callEnd', (code: Status) => {
if (code === Status.OK) { if (code === Status.OK) {
this.callTracker.addCallSucceeded(); this.callTracker.addCallSucceeded();
@ -804,7 +827,8 @@ export class Server {
this.callTracker.addCallFailed(); this.callTracker.addCallFailed();
} }
}); });
if (this.channelzEnabled && channelzSessionInfo) {
if (channelzSessionInfo) {
call.once('streamEnd', (success: boolean) => { call.once('streamEnd', (success: boolean) => {
if (success) { if (success) {
channelzSessionInfo.streamTracker.addCallSucceeded(); channelzSessionInfo.streamTracker.addCallSucceeded();
@ -821,59 +845,100 @@ export class Server {
channelzSessionInfo.lastMessageReceivedTimestamp = new Date(); channelzSessionInfo.lastMessageReceivedTimestamp = new Date();
}); });
} }
if (!this._runHandlerForCall(call, handler, headers)) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed()
call.sendError({
code: Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`
});
}
}
private _streamHandler(stream: http2.ServerHttp2Stream, headers: http2.IncomingHttpHeaders) {
if (this._verifyContentType(stream, headers) !== true) {
return
}
let handler: Handler<any, any>
try {
handler = this._retrieveHandler(headers)
} catch (err) {
this._respondWithError(err, stream, null)
return
}
const call = new Http2ServerCallStream(stream, handler, this.options)
if (!this._runHandlerForCall(call, handler, headers)) {
call.sendError({
code: Status.INTERNAL,
details: `Unknown handler type: ${handler.type}`
});
}
}
private _runHandlerForCall(call: Http2ServerCallStream<any, any>, handler: Handler<any, any>, headers: http2.IncomingHttpHeaders): boolean {
const metadata = call.receiveMetadata(headers); const metadata = call.receiveMetadata(headers);
const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity'; const encoding = (metadata.get('grpc-encoding')[0] as string | undefined) ?? 'identity';
metadata.remove('grpc-encoding'); metadata.remove('grpc-encoding');
switch (handler.type) { const { type } = handler
case 'unary': if (type === 'unary') {
handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding); handleUnary(call, handler as UntypedUnaryHandler, metadata, encoding);
break; } else if (type === 'clientStream') {
case 'clientStream':
handleClientStreaming( handleClientStreaming(
call, call,
handler as UntypedClientStreamingHandler, handler as UntypedClientStreamingHandler,
metadata, metadata,
encoding encoding
); );
break; } else if (type === 'serverStream') {
case 'serverStream':
handleServerStreaming( handleServerStreaming(
call, call,
handler as UntypedServerStreamingHandler, handler as UntypedServerStreamingHandler,
metadata, metadata,
encoding encoding
); );
break; } else if (type === 'bidi') {
case 'bidi':
handleBidiStreaming( handleBidiStreaming(
call, call,
handler as UntypedBidiStreamingHandler, handler as UntypedBidiStreamingHandler,
metadata, metadata,
encoding encoding
); );
break; } else {
default: return false
throw new Error(`Unknown handler type: ${handler.type}`);
}
} catch (err) {
if (!call) {
call = new Http2ServerCallStream(stream, null!, this.options);
if (this.channelzEnabled) {
this.callTracker.addCallFailed();
channelzSessionInfo?.streamTracker.addCallFailed()
}
} }
if (err.code === undefined) { return true
err.code = Status.INTERNAL;
} }
call.sendError(err); private _setupHandlers(
http2Server: http2.Http2Server | http2.Http2SecureServer
): void {
if (http2Server === null) {
return;
} }
}
);
const serverAddress = http2Server.address();
let serverAddressString = 'null'
if (serverAddress) {
if (typeof serverAddress === 'string') {
serverAddressString = serverAddress
} else {
serverAddressString =
serverAddress.address + ':' + serverAddress.port
}
}
this.serverAddressString = serverAddressString
const handler = this.channelzEnabled
? this._channelzHandler
: this._streamHandler
http2Server.on('stream', handler.bind(this))
http2Server.on('session', (session) => { http2Server.on('session', (session) => {
if (!this.started) { if (!this.started) {
session.destroy(); session.destroy();
@ -910,13 +975,17 @@ export class Server {
} }
} }
async function handleUnary<RequestType, ResponseType>( function handleUnary<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>, call: Http2ServerCallStream<RequestType, ResponseType>,
handler: UnaryHandler<RequestType, ResponseType>, handler: UnaryHandler<RequestType, ResponseType>,
metadata: Metadata, metadata: Metadata,
encoding: string encoding: string
): Promise<void> { ): void {
const request = await call.receiveUnaryMessage(encoding); call.receiveUnaryMessage(encoding, (err, request) => {
if (err) {
call.sendError(err)
return
}
if (request === undefined || call.cancelled) { if (request === undefined || call.cancelled) {
return; return;
@ -939,6 +1008,7 @@ async function handleUnary<RequestType, ResponseType>(
call.sendUnaryMessage(err, value, trailer, flags); call.sendUnaryMessage(err, value, trailer, flags);
} }
); );
});
} }
function handleClientStreaming<RequestType, ResponseType>( function handleClientStreaming<RequestType, ResponseType>(
@ -972,13 +1042,17 @@ function handleClientStreaming<RequestType, ResponseType>(
handler.func(stream, respond); handler.func(stream, respond);
} }
async function handleServerStreaming<RequestType, ResponseType>( function handleServerStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>, call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ServerStreamingHandler<RequestType, ResponseType>, handler: ServerStreamingHandler<RequestType, ResponseType>,
metadata: Metadata, metadata: Metadata,
encoding: string encoding: string
): Promise<void> { ): void {
const request = await call.receiveUnaryMessage(encoding); call.receiveUnaryMessage(encoding, (err, request) => {
if (err) {
call.sendError(err)
return
}
if (request === undefined || call.cancelled) { if (request === undefined || call.cancelled) {
return; return;
@ -992,6 +1066,7 @@ async function handleServerStreaming<RequestType, ResponseType>(
); );
handler.func(stream); handler.func(stream);
});
} }
function handleBidiStreaming<RequestType, ResponseType>( function handleBidiStreaming<RequestType, ResponseType>(