grpc-js: Stop leaking freed message buffer placeholder objects

This commit is contained in:
Michael Lumish 2023-02-23 17:49:03 -08:00
parent 6614ebbc45
commit 1f14d1c138
2 changed files with 34 additions and 29 deletions

View File

@ -1,6 +1,6 @@
{ {
"name": "@grpc/grpc-js", "name": "@grpc/grpc-js",
"version": "1.8.10", "version": "1.8.11",
"description": "gRPC Library for Node - pure JS implementation", "description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/", "homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",

View File

@ -151,6 +151,12 @@ export class RetryingCall implements Call {
private initialMetadata: Metadata | null = null; private initialMetadata: Metadata | null = null;
private underlyingCalls: UnderlyingCall[] = []; private underlyingCalls: UnderlyingCall[] = [];
private writeBuffer: WriteBufferEntry[] = []; private writeBuffer: WriteBufferEntry[] = [];
/**
* The offset of message indices in the writeBuffer. For example, if
* writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15
* is in writeBuffer[5].
*/
private writeBufferOffset = 0;
/** /**
* Tracks whether a read has been started, so that we know whether to start * Tracks whether a read has been started, so that we know whether to start
* reads on new child calls. This only matters for the first read, because * reads on new child calls. This only matters for the first read, because
@ -203,14 +209,8 @@ export class RetryingCall implements Call {
private reportStatus(statusObject: StatusObject) { private reportStatus(statusObject: StatusObject) {
this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"'); this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"');
this.bufferTracker.freeAll(this.callNumber); this.bufferTracker.freeAll(this.callNumber);
for (let i = 0; i < this.writeBuffer.length; i++) { this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length;
if (this.writeBuffer[i].entryType === 'MESSAGE') { this.writeBuffer = [];
this.writeBuffer[i] = {
entryType: 'FREED',
allocated: false
};
}
}
process.nextTick(() => { process.nextTick(() => {
// Explicitly construct status object to remove progress field // Explicitly construct status object to remove progress field
this.listener?.onReceiveStatus({ this.listener?.onReceiveStatus({
@ -236,20 +236,27 @@ export class RetryingCall implements Call {
} }
} }
private maybefreeMessageBufferEntry(messageIndex: number) { private getBufferEntry(messageIndex: number): WriteBufferEntry {
return this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {entryType: 'FREED', allocated: false};
}
private getNextBufferIndex() {
return this.writeBufferOffset + this.writeBuffer.length;
}
private clearSentMessages() {
if (this.state !== 'COMMITTED') { if (this.state !== 'COMMITTED') {
return; return;
} }
const bufferEntry = this.writeBuffer[messageIndex]; const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend;
if (bufferEntry.entryType === 'MESSAGE') { for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) {
const bufferEntry = this.getBufferEntry(messageIndex);
if (bufferEntry.allocated) { if (bufferEntry.allocated) {
this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber); this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber);
} }
this.writeBuffer[messageIndex] = {
entryType: 'FREED',
allocated: false
};
} }
this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset);
this.writeBufferOffset = earliestNeededMessageIndex;
} }
private commitCall(index: number) { private commitCall(index: number) {
@ -272,9 +279,7 @@ export class RetryingCall implements Call {
this.underlyingCalls[i].state = 'COMPLETED'; this.underlyingCalls[i].state = 'COMPLETED';
this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt'); this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt');
} }
for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) { this.clearSentMessages();
this.maybefreeMessageBufferEntry(messageIndex);
}
} }
private commitCallWithMostMessages() { private commitCallWithMostMessages() {
@ -555,8 +560,8 @@ export class RetryingCall implements Call {
private handleChildWriteCompleted(childIndex: number) { private handleChildWriteCompleted(childIndex: number) {
const childCall = this.underlyingCalls[childIndex]; const childCall = this.underlyingCalls[childIndex];
const messageIndex = childCall.nextMessageToSend; const messageIndex = childCall.nextMessageToSend;
this.writeBuffer[messageIndex].callback?.(); this.getBufferEntry(messageIndex).callback?.();
this.maybefreeMessageBufferEntry(messageIndex); this.clearSentMessages();
childCall.nextMessageToSend += 1; childCall.nextMessageToSend += 1;
this.sendNextChildMessage(childIndex); this.sendNextChildMessage(childIndex);
} }
@ -566,10 +571,10 @@ export class RetryingCall implements Call {
if (childCall.state === 'COMPLETED') { if (childCall.state === 'COMPLETED') {
return; return;
} }
if (this.writeBuffer[childCall.nextMessageToSend]) { if (this.getBufferEntry(childCall.nextMessageToSend)) {
const bufferEntry = this.writeBuffer[childCall.nextMessageToSend]; const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
switch (bufferEntry.entryType) { switch (bufferEntry.entryType) {
case 'MESSAGE': case 'MESSAGE':
childCall.call.sendMessageWithContext({ childCall.call.sendMessageWithContext({
callback: (error) => { callback: (error) => {
// Ignore error // Ignore error
@ -594,13 +599,13 @@ export class RetryingCall implements Call {
message, message,
flags: context.flags, flags: context.flags,
}; };
const messageIndex = this.writeBuffer.length; const messageIndex = this.getNextBufferIndex();
const bufferEntry: WriteBufferEntry = { const bufferEntry: WriteBufferEntry = {
entryType: 'MESSAGE', entryType: 'MESSAGE',
message: writeObj, message: writeObj,
allocated: this.bufferTracker.allocate(message.length, this.callNumber) allocated: this.bufferTracker.allocate(message.length, this.callNumber)
}; };
this.writeBuffer[messageIndex] = bufferEntry; this.writeBuffer.push(bufferEntry);
if (bufferEntry.allocated) { if (bufferEntry.allocated) {
context.callback?.(); context.callback?.();
for (const [callIndex, call] of this.underlyingCalls.entries()) { for (const [callIndex, call] of this.underlyingCalls.entries()) {
@ -642,11 +647,11 @@ export class RetryingCall implements Call {
} }
halfClose(): void { halfClose(): void {
this.trace('halfClose called'); this.trace('halfClose called');
const halfCloseIndex = this.writeBuffer.length; const halfCloseIndex = this.getNextBufferIndex();
this.writeBuffer[halfCloseIndex] = { this.writeBuffer.push({
entryType: 'HALF_CLOSE', entryType: 'HALF_CLOSE',
allocated: false allocated: false
}; });
for (const call of this.underlyingCalls) { for (const call of this.underlyingCalls) {
if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) { if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) {
call.nextMessageToSend += 1; call.nextMessageToSend += 1;