promisify receiveUnaryMessage server-call

This commit is contained in:
Cedric Kassen 2023-07-12 13:32:35 +02:00
parent 1cc36e8df8
commit 14b18a4bba
2 changed files with 116 additions and 127 deletions

View File

@ -553,102 +553,100 @@ export class Http2ServerCallStream<
return metadata;
}
receiveUnaryMessage(
encoding: string,
next: (
err: Partial<ServerStatusResponse> | null,
request?: RequestType
) => void
): void {
const { stream } = this;
receiveUnaryMessage(encoding: string): Promise<RequestType | void> {
return new Promise((resolve, reject) => {
const { stream } = this;
let receivedLength = 0;
let receivedLength = 0;
// eslint-disable-next-line @typescript-eslint/no-this-alias
const call = this;
const body: Buffer[] = [];
const limit = this.maxReceiveMessageSize;
// eslint-disable-next-line @typescript-eslint/no-this-alias
const call = this;
const body: Buffer[] = [];
const limit = this.maxReceiveMessageSize;
stream.on('data', onData);
stream.on('end', onEnd);
stream.on('error', onEnd);
this.stream.on('data', onData);
this.stream.on('end', onEnd);
this.stream.on('error', onEnd);
function onData(chunk: Buffer) {
receivedLength += chunk.byteLength;
async function onData(chunk: Buffer) {
receivedLength += chunk.byteLength;
if (limit !== -1 && receivedLength > limit) {
if (limit !== -1 && receivedLength > limit) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
reject({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
});
return;
}
body.push(chunk);
}
async function onEnd(err?: Error) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
next({
code: Status.RESOURCE_EXHAUSTED,
details: `Received message larger than max (${receivedLength} vs. ${limit})`,
});
return;
if (err !== undefined) {
reject({ code: Status.INTERNAL, details: err.message });
return;
}
if (receivedLength === 0) {
reject({
code: Status.INTERNAL,
details: 'received empty unary message',
});
return;
}
call.emit('receiveMessage');
const requestBytes = Buffer.concat(body, receivedLength);
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = call.getDecompressedMessage(
requestBytes,
compressedMessageEncoding
);
if (Buffer.isBuffer(decompressedMessage)) {
call.safeDeserializeMessage(decompressedMessage, resolve, reject);
return;
}
decompressedMessage.then(
decompressed =>
call.safeDeserializeMessage(decompressed, resolve, reject),
(err: any) =>
reject(
err.code
? err
: {
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
}
)
);
}
body.push(chunk);
}
function onEnd(err?: Error) {
stream.removeListener('data', onData);
stream.removeListener('end', onEnd);
stream.removeListener('error', onEnd);
if (err !== undefined) {
next({ code: Status.INTERNAL, details: err.message });
return;
}
if (receivedLength === 0) {
next({
code: Status.INTERNAL,
details: 'received empty unary message',
});
return;
}
call.emit('receiveMessage');
const requestBytes = Buffer.concat(body, receivedLength);
const compressed = requestBytes.readUInt8(0) === 1;
const compressedMessageEncoding = compressed ? encoding : 'identity';
const decompressedMessage = call.getDecompressedMessage(
requestBytes,
compressedMessageEncoding
);
if (Buffer.isBuffer(decompressedMessage)) {
call.safeDeserializeMessage(decompressedMessage, next);
return;
}
decompressedMessage.then(
decompressed => call.safeDeserializeMessage(decompressed, next),
(err: any) =>
next(
err.code
? err
: {
code: Status.INTERNAL,
details: `Received "grpc-encoding" header "${encoding}" but ${encoding} decompression failed`,
}
)
);
}
});
}
private safeDeserializeMessage(
buffer: Buffer,
next: (
err: Partial<ServerStatusResponse> | null,
request?: RequestType
) => void
resolve: (
value: void | RequestType | PromiseLike<void | RequestType>
) => void,
reject: (reason: any) => void
) {
try {
next(null, this.deserializeMessage(buffer));
resolve(this.deserializeMessage(buffer));
} catch (err) {
next({
reject({
details: getErrorMessage(err),
code: Status.INTERNAL,
});

View File

@ -96,6 +96,7 @@ function getUnimplementedStatusResponse(
return {
code: Status.UNIMPLEMENTED,
details: `The server does not implement the method ${methodName}`,
metadata: new Metadata(),
};
}
@ -1176,40 +1177,35 @@ export class Server {
}
}
function handleUnary<RequestType, ResponseType>(
async function handleUnary<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: UnaryHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
): void {
call.receiveUnaryMessage(encoding, (err, request) => {
if (err) {
call.sendError(err);
return;
): Promise<void> {
const request = await call.receiveUnaryMessage(encoding);
if (request === undefined || call.cancelled) {
return;
}
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
call,
metadata,
request
);
handler.func(
emitter,
(
err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) => {
call.sendUnaryMessage(err, value, trailer, flags);
}
if (request === undefined || call.cancelled) {
return;
}
const emitter = new ServerUnaryCallImpl<RequestType, ResponseType>(
call,
metadata,
request
);
handler.func(
emitter,
(
err: ServerErrorResponse | ServerStatusResponse | null,
value?: ResponseType | null,
trailer?: Metadata,
flags?: number
) => {
call.sendUnaryMessage(err, value, trailer, flags);
}
);
});
);
}
function handleClientStreaming<RequestType, ResponseType>(
@ -1243,31 +1239,26 @@ function handleClientStreaming<RequestType, ResponseType>(
handler.func(stream, respond);
}
function handleServerStreaming<RequestType, ResponseType>(
async function handleServerStreaming<RequestType, ResponseType>(
call: Http2ServerCallStream<RequestType, ResponseType>,
handler: ServerStreamingHandler<RequestType, ResponseType>,
metadata: Metadata,
encoding: string
): void {
call.receiveUnaryMessage(encoding, (err, request) => {
if (err) {
call.sendError(err);
return;
}
): Promise<void> {
const request = await call.receiveUnaryMessage(encoding);
if (request === undefined || call.cancelled) {
return;
}
if (request === undefined || call.cancelled) {
return;
}
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
call,
metadata,
handler.serialize,
request
);
const stream = new ServerWritableStreamImpl<RequestType, ResponseType>(
call,
metadata,
handler.serialize,
request
);
handler.func(stream);
});
handler.func(stream);
}
function handleBidiStreaming<RequestType, ResponseType>(