diff --git a/packages/grpc-js/package.json b/packages/grpc-js/package.json index 0ae2d674..f75f780d 100644 --- a/packages/grpc-js/package.json +++ b/packages/grpc-js/package.json @@ -1,6 +1,6 @@ { "name": "@grpc/grpc-js", - "version": "1.8.10", + "version": "1.8.11", "description": "gRPC Library for Node - pure JS implementation", "homepage": "https://grpc.io/", "repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js", diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index a9424373..5ae585b9 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -151,6 +151,12 @@ export class RetryingCall implements Call { private initialMetadata: Metadata | null = null; private underlyingCalls: UnderlyingCall[] = []; private writeBuffer: WriteBufferEntry[] = []; + /** + * The offset of message indices in the writeBuffer. For example, if + * writeBufferOffset is 10, message 10 is in writeBuffer[0] and message 15 + * is in writeBuffer[5]. + */ + private writeBufferOffset = 0; /** * Tracks whether a read has been started, so that we know whether to start * reads on new child calls. This only matters for the first read, because @@ -203,14 +209,8 @@ export class RetryingCall implements Call { private reportStatus(statusObject: StatusObject) { this.trace('ended with status: code=' + statusObject.code + ' details="' + statusObject.details + '"'); this.bufferTracker.freeAll(this.callNumber); - for (let i = 0; i < this.writeBuffer.length; i++) { - if (this.writeBuffer[i].entryType === 'MESSAGE') { - this.writeBuffer[i] = { - entryType: 'FREED', - allocated: false - }; - } - } + this.writeBufferOffset = this.writeBufferOffset + this.writeBuffer.length; + this.writeBuffer = []; process.nextTick(() => { // Explicitly construct status object to remove progress field this.listener?.onReceiveStatus({ @@ -236,20 +236,27 @@ export class RetryingCall implements Call { } } - private maybefreeMessageBufferEntry(messageIndex: number) { + private getBufferEntry(messageIndex: number): WriteBufferEntry { + return this.writeBuffer[messageIndex - this.writeBufferOffset] ?? {entryType: 'FREED', allocated: false}; + } + + private getNextBufferIndex() { + return this.writeBufferOffset + this.writeBuffer.length; + } + + private clearSentMessages() { if (this.state !== 'COMMITTED') { return; } - const bufferEntry = this.writeBuffer[messageIndex]; - if (bufferEntry.entryType === 'MESSAGE') { + const earliestNeededMessageIndex = this.underlyingCalls[this.committedCallIndex!].nextMessageToSend; + for (let messageIndex = this.writeBufferOffset; messageIndex < earliestNeededMessageIndex; messageIndex++) { + const bufferEntry = this.getBufferEntry(messageIndex); if (bufferEntry.allocated) { this.bufferTracker.free(bufferEntry.message!.message.length, this.callNumber); } - this.writeBuffer[messageIndex] = { - entryType: 'FREED', - allocated: false - }; } + this.writeBuffer = this.writeBuffer.slice(earliestNeededMessageIndex - this.writeBufferOffset); + this.writeBufferOffset = earliestNeededMessageIndex; } private commitCall(index: number) { @@ -272,9 +279,7 @@ export class RetryingCall implements Call { this.underlyingCalls[i].state = 'COMPLETED'; this.underlyingCalls[i].call.cancelWithStatus(Status.CANCELLED, 'Discarded in favor of other hedged attempt'); } - for (let messageIndex = 0; messageIndex < this.underlyingCalls[index].nextMessageToSend - 1; messageIndex += 1) { - this.maybefreeMessageBufferEntry(messageIndex); - } + this.clearSentMessages(); } private commitCallWithMostMessages() { @@ -555,8 +560,8 @@ export class RetryingCall implements Call { private handleChildWriteCompleted(childIndex: number) { const childCall = this.underlyingCalls[childIndex]; const messageIndex = childCall.nextMessageToSend; - this.writeBuffer[messageIndex].callback?.(); - this.maybefreeMessageBufferEntry(messageIndex); + this.getBufferEntry(messageIndex).callback?.(); + this.clearSentMessages(); childCall.nextMessageToSend += 1; this.sendNextChildMessage(childIndex); } @@ -566,10 +571,10 @@ export class RetryingCall implements Call { if (childCall.state === 'COMPLETED') { return; } - if (this.writeBuffer[childCall.nextMessageToSend]) { - const bufferEntry = this.writeBuffer[childCall.nextMessageToSend]; + if (this.getBufferEntry(childCall.nextMessageToSend)) { + const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); switch (bufferEntry.entryType) { - case 'MESSAGE': + case 'MESSAGE': childCall.call.sendMessageWithContext({ callback: (error) => { // Ignore error @@ -594,13 +599,13 @@ export class RetryingCall implements Call { message, flags: context.flags, }; - const messageIndex = this.writeBuffer.length; + const messageIndex = this.getNextBufferIndex(); const bufferEntry: WriteBufferEntry = { entryType: 'MESSAGE', message: writeObj, allocated: this.bufferTracker.allocate(message.length, this.callNumber) }; - this.writeBuffer[messageIndex] = bufferEntry; + this.writeBuffer.push(bufferEntry); if (bufferEntry.allocated) { context.callback?.(); for (const [callIndex, call] of this.underlyingCalls.entries()) { @@ -642,11 +647,11 @@ export class RetryingCall implements Call { } halfClose(): void { this.trace('halfClose called'); - const halfCloseIndex = this.writeBuffer.length; - this.writeBuffer[halfCloseIndex] = { + const halfCloseIndex = this.getNextBufferIndex(); + this.writeBuffer.push({ entryType: 'HALF_CLOSE', allocated: false - }; + }); for (const call of this.underlyingCalls) { if (call?.state === 'ACTIVE' && call.nextMessageToSend === halfCloseIndex) { call.nextMessageToSend += 1;