mirror of https://github.com/grpc/grpc-node.git
				
				
				
			Merge pull request #468 from murgatroid99/pure_js_naive_keepalive
Pure js: add simple keepalive logic
This commit is contained in:
		
						commit
						1af773ca54
					
				|  | @ -17,7 +17,7 @@ | |||
|   "devDependencies": { | ||||
|     "@types/lodash": "^4.14.108", | ||||
|     "@types/mocha": "^2.2.43", | ||||
|     "@types/node": "^9.4.6", | ||||
|     "@types/node": "^10.5.4", | ||||
|     "clang-format": "^1.0.55", | ||||
|     "gts": "^0.5.1", | ||||
|     "typescript": "~2.7.0" | ||||
|  |  | |||
|  | @ -13,6 +13,7 @@ import {DeadlineFilterFactory} from './deadline-filter'; | |||
| import {FilterStackFactory} from './filter-stack'; | ||||
| import {Metadata, MetadataObject} from './metadata'; | ||||
| import {MetadataStatusFilterFactory} from './metadata-status-filter'; | ||||
| import { Http2SubChannel } from './subchannel'; | ||||
| 
 | ||||
| const {version: clientVersion} = require('../../package'); | ||||
| 
 | ||||
|  | @ -42,6 +43,8 @@ export interface ChannelOptions { | |||
|   'grpc.primary_user_agent': string; | ||||
|   'grpc.secondary_user_agent': string; | ||||
|   'grpc.default_authority': string; | ||||
|   'grpc.keepalive_time_ms': number; | ||||
|   'grpc.keepalive_timeout_ms': number; | ||||
|   [key: string]: string|number; | ||||
| } | ||||
| 
 | ||||
|  | @ -82,15 +85,6 @@ export interface Channel extends EventEmitter { | |||
|   /* tslint:enable:no-any */ | ||||
| } | ||||
| 
 | ||||
| /* This should be a real subchannel class that contains a ClientHttp2Session, | ||||
|  * but for now this serves its purpose */ | ||||
| type Http2SubChannel = http2.ClientHttp2Session&{ | ||||
|   /* Count the number of currently active streams associated with the session. | ||||
|    * The purpose of this is to keep the session reffed if and only if there | ||||
|    * is at least one active stream */ | ||||
|   streamCount?: number; | ||||
| }; | ||||
| 
 | ||||
| export class Http2Channel extends EventEmitter implements Channel { | ||||
|   private readonly userAgent: string; | ||||
|   private readonly target: url.URL; | ||||
|  | @ -169,14 +163,10 @@ export class Http2Channel extends EventEmitter implements Channel { | |||
|   } | ||||
| 
 | ||||
|   private startConnecting(): void { | ||||
|     let subChannel: Http2SubChannel; | ||||
|     const secureContext = this.credentials.getSecureContext(); | ||||
|     if (secureContext === null) { | ||||
|       subChannel = http2.connect(this.target); | ||||
|     } else { | ||||
|       const connectionOptions: http2.SecureClientSessionOptions = { | ||||
|         secureContext, | ||||
|       }; | ||||
|     let connectionOptions: http2.SecureClientSessionOptions = {}; | ||||
|     if (secureContext !== null) { | ||||
|       connectionOptions.secureContext = secureContext; | ||||
|       // If provided, the value of grpc.ssl_target_name_override should be used
 | ||||
|       // to override the target hostname when checking server identity.
 | ||||
|       // This option is used for testing only.
 | ||||
|  | @ -189,8 +179,8 @@ export class Http2Channel extends EventEmitter implements Channel { | |||
|             }; | ||||
|         connectionOptions.servername = sslTargetNameOverride; | ||||
|       } | ||||
|       subChannel = http2.connect(this.target, connectionOptions); | ||||
|     } | ||||
|     const subChannel: Http2SubChannel = new Http2SubChannel(this.target, connectionOptions, this.userAgent, this.options); | ||||
|     this.subChannel = subChannel; | ||||
|     const now = new Date(); | ||||
|     const connectionTimeout: number = Math.max( | ||||
|  | @ -218,7 +208,6 @@ export class Http2Channel extends EventEmitter implements Channel { | |||
|           ConnectivityState.TRANSIENT_FAILURE); | ||||
|     }; | ||||
|     subChannel.once('close', this.subChannelCloseCallback); | ||||
|     subChannel.once('error', this.subChannelCloseCallback); | ||||
|   } | ||||
| 
 | ||||
|   constructor( | ||||
|  | @ -244,7 +233,6 @@ export class Http2Channel extends EventEmitter implements Channel { | |||
|     /* The only purpose of these lines is to ensure that this.backoffTimerId has | ||||
|      * a value of type NodeJS.Timer. */ | ||||
|     this.backoffTimerId = setTimeout(() => {}, 0); | ||||
|     clearTimeout(this.backoffTimerId); | ||||
| 
 | ||||
|     // Build user-agent string.
 | ||||
|     this.userAgent = [ | ||||
|  | @ -268,25 +256,8 @@ export class Http2Channel extends EventEmitter implements Channel { | |||
|           headers[HTTP2_HEADER_PATH] = methodName; | ||||
|           headers[HTTP2_HEADER_TE] = 'trailers'; | ||||
|           if (this.connectivityState === ConnectivityState.READY) { | ||||
|             const session: Http2SubChannel = this.subChannel!; | ||||
|             let http2Stream = session.request(headers); | ||||
|             /* This is a very ad-hoc reference counting scheme. This should be | ||||
|              * handled by a subchannel class */ | ||||
|             session.ref(); | ||||
|             if (!session.streamCount) { | ||||
|               session.streamCount = 0; | ||||
|             } | ||||
|             session.streamCount += 1; | ||||
|             http2Stream.on('close', () => { | ||||
|               if (!session.streamCount) { | ||||
|                 session.streamCount = 0; | ||||
|               } | ||||
|               session.streamCount -= 1; | ||||
|               if (session.streamCount <= 0) { | ||||
|                 session.unref(); | ||||
|               } | ||||
|             }); | ||||
|             stream.attachHttp2Stream(http2Stream); | ||||
|             const subChannel: Http2SubChannel = this.subChannel!; | ||||
|             subChannel.startCallStream(metadataValue, stream); | ||||
|           } else { | ||||
|             /* In this case, we lost the connection while finalizing | ||||
|              * metadata. That should be very unusual */ | ||||
|  |  | |||
|  | @ -0,0 +1,134 @@ | |||
| import * as http2 from 'http2'; | ||||
| import * as url from 'url'; | ||||
| 
 | ||||
| import { EventEmitter } from "events"; | ||||
| import { Metadata } from "./metadata"; | ||||
| import { CallStream, CallOptions, Http2CallStream } from "./call-stream"; | ||||
| import { EmitterAugmentation1, EmitterAugmentation0 } from "./events"; | ||||
| import { ChannelOptions } from './channel'; | ||||
| 
 | ||||
| const { | ||||
|   HTTP2_HEADER_AUTHORITY, | ||||
|   HTTP2_HEADER_CONTENT_TYPE, | ||||
|   HTTP2_HEADER_METHOD, | ||||
|   HTTP2_HEADER_PATH, | ||||
|   HTTP2_HEADER_SCHEME, | ||||
|   HTTP2_HEADER_TE, | ||||
|   HTTP2_HEADER_USER_AGENT | ||||
| } = http2.constants; | ||||
| 
 | ||||
| /* setInterval and setTimeout only accept signed 32 bit integers. JS doesn't | ||||
|  * have a constant for the max signed 32 bit integer, so this is a simple way | ||||
|  * to calculate it */ | ||||
| const KEEPALIVE_TIME_MS = ~(1 << 31); | ||||
| const KEEPALIVE_TIMEOUT_MS = 20000; | ||||
| 
 | ||||
| export interface SubChannel extends EventEmitter { | ||||
|   /** | ||||
|    * Attach a call stream to this subchannel's connection to start it | ||||
|    * @param headers The headers to start the stream with | ||||
|    * @param callStream The stream to start | ||||
|    */ | ||||
|   startCallStream(metadata: Metadata, callStream: CallStream): void; | ||||
|   close(): void; | ||||
| } | ||||
| 
 | ||||
| export class Http2SubChannel extends EventEmitter implements SubChannel { | ||||
|   private session: http2.ClientHttp2Session; | ||||
|   private refCount: number = 0; | ||||
|   private userAgent: string; | ||||
| 
 | ||||
|   private keepaliveTimeMs: number = KEEPALIVE_TIME_MS; | ||||
|   private keepaliveTimeoutMs: number = KEEPALIVE_TIMEOUT_MS; | ||||
|   private keepaliveIntervalId: NodeJS.Timer; | ||||
|   private keepaliveTimeoutId: NodeJS.Timer; | ||||
| 
 | ||||
|   constructor(target: url.URL, connectionOptions: http2.SecureClientSessionOptions, | ||||
|               userAgent: string, channelArgs: Partial<ChannelOptions>) { | ||||
|     super(); | ||||
|     this.session = http2.connect(target, connectionOptions); | ||||
|     this.session.on('connect', () => { | ||||
|       this.emit('connect'); | ||||
|     }); | ||||
|     this.session.on('close', () => { | ||||
|       this.stopKeepalivePings(); | ||||
|       this.emit('close'); | ||||
|     }); | ||||
|     this.session.on('error', () => { | ||||
|       this.stopKeepalivePings(); | ||||
|       this.emit('close'); | ||||
|     }) | ||||
|     this.userAgent = userAgent; | ||||
| 
 | ||||
|     if (channelArgs['grpc.keepalive_time_ms']) { | ||||
|       this.keepaliveTimeMs = channelArgs['grpc.keepalive_time_ms']!; | ||||
|     } | ||||
|     if (channelArgs['grpc.keepalive_timeout_ms']) { | ||||
|       this.keepaliveTimeoutMs = channelArgs['grpc.keepalive_timeout_ms']!; | ||||
|     } | ||||
|     this.keepaliveIntervalId = setTimeout(() => {}, 0); | ||||
|     clearTimeout(this.keepaliveIntervalId); | ||||
|     this.keepaliveTimeoutId = setTimeout(() => {}, 0); | ||||
|     clearTimeout(this.keepaliveTimeoutId); | ||||
|   } | ||||
| 
 | ||||
|   private ref() { | ||||
|     if (this.refCount === 0) { | ||||
|       this.session.ref(); | ||||
|       this.startKeepalivePings(); | ||||
|     } | ||||
|     this.refCount += 1; | ||||
|   } | ||||
| 
 | ||||
|   private unref() { | ||||
|     this.refCount -= 1; | ||||
|     if (this.refCount === 0) { | ||||
|       this.session.unref(); | ||||
|       this.stopKeepalivePings(); | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   private sendPing() { | ||||
|     this.keepaliveTimeoutId = setTimeout(() => { | ||||
|       this.emit('close'); | ||||
|     }, this.keepaliveTimeoutMs); | ||||
|     this.session.ping((err: Error | null, duration: number, payload: Buffer) => { | ||||
|       clearTimeout(this.keepaliveTimeoutId); | ||||
|     }); | ||||
|   } | ||||
| 
 | ||||
|   /* TODO(murgatroid99): refactor subchannels so that keepalives can be handled | ||||
|    * per subchannel */ | ||||
|   private startKeepalivePings() { | ||||
|     this.keepaliveIntervalId = setInterval(() => { | ||||
|       this.sendPing(); | ||||
|     }, this.keepaliveTimeMs); | ||||
|     this.sendPing(); | ||||
|   } | ||||
| 
 | ||||
|   private stopKeepalivePings() { | ||||
|     clearInterval(this.keepaliveIntervalId); | ||||
|     clearTimeout(this.keepaliveTimeoutId); | ||||
|   } | ||||
| 
 | ||||
|   // Prerequisite: this subchannel is connected
 | ||||
|   startCallStream(metadata: Metadata, callStream: Http2CallStream) { | ||||
|     const headers = metadata.toHttp2Headers(); | ||||
|     headers[HTTP2_HEADER_AUTHORITY] = callStream.getHost(); | ||||
|     headers[HTTP2_HEADER_USER_AGENT] = this.userAgent; | ||||
|     headers[HTTP2_HEADER_CONTENT_TYPE] = 'application/grpc'; | ||||
|     headers[HTTP2_HEADER_METHOD] = 'POST'; | ||||
|     headers[HTTP2_HEADER_PATH] = callStream.getMethod(); | ||||
|     headers[HTTP2_HEADER_TE] = 'trailers'; | ||||
|     let http2Stream = this.session.request(headers); | ||||
|     this.ref(); | ||||
|     http2Stream.on('close', () => { | ||||
|       this.unref(); | ||||
|     }); | ||||
|     callStream.attachHttp2Stream(http2Stream); | ||||
|   } | ||||
|    | ||||
|   close() { | ||||
|     this.session.close(); | ||||
|   } | ||||
| } | ||||
		Loading…
	
		Reference in New Issue