mirror of https://github.com/grpc/grpc-node.git
Add forced completion queue poll to ensure that waitForReady uses current connectivity state
This commit is contained in:
parent
296897f152
commit
661bdfaa5d
|
|
@ -35,7 +35,7 @@ grpc_completion_queue *queue;
|
||||||
uv_prepare_t prepare;
|
uv_prepare_t prepare;
|
||||||
int pending_batches;
|
int pending_batches;
|
||||||
|
|
||||||
void drain_completion_queue(uv_prepare_t *handle) {
|
static void drain_completion_queue(uv_prepare_t *handle) {
|
||||||
Nan::HandleScope scope;
|
Nan::HandleScope scope;
|
||||||
grpc_event event;
|
grpc_event event;
|
||||||
(void)handle;
|
(void)handle;
|
||||||
|
|
@ -53,9 +53,9 @@ void drain_completion_queue(uv_prepare_t *handle) {
|
||||||
CompleteTag(event.tag, error_message);
|
CompleteTag(event.tag, error_message);
|
||||||
grpc::node::DestroyTag(event.tag);
|
grpc::node::DestroyTag(event.tag);
|
||||||
pending_batches--;
|
pending_batches--;
|
||||||
if (pending_batches == 0) {
|
}
|
||||||
uv_prepare_stop(&prepare);
|
if (pending_batches == 0) {
|
||||||
}
|
uv_prepare_stop(&prepare);
|
||||||
}
|
}
|
||||||
} while (event.type != GRPC_QUEUE_TIMEOUT);
|
} while (event.type != GRPC_QUEUE_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
@ -76,5 +76,15 @@ void CompletionQueueInit(Local<Object> exports) {
|
||||||
pending_batches = 0;
|
pending_batches = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CompletionQueueForcePoll() {
|
||||||
|
/* This sets the prepare object to poll on the completion queue the next time
|
||||||
|
* Node polls for IO. But it doesn't increment the number of pending batches,
|
||||||
|
* so it will immediately stop polling after that unless there is an
|
||||||
|
* intervening CompletionQueueNext call */
|
||||||
|
if (pending_batches == 0) {
|
||||||
|
uv_prepare_start(&prepare, drain_completion_queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace node
|
} // namespace node
|
||||||
} // namespace grpc
|
} // namespace grpc
|
||||||
|
|
|
||||||
|
|
@ -265,6 +265,10 @@ NAN_METHOD(SetLogVerbosity) {
|
||||||
gpr_set_log_verbosity(severity);
|
gpr_set_log_verbosity(severity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NAN_METHOD(ForcePoll) {
|
||||||
|
grpc::node::CompletionQueueForcePoll();
|
||||||
|
}
|
||||||
|
|
||||||
void init(Local<Object> exports) {
|
void init(Local<Object> exports) {
|
||||||
Nan::HandleScope scope;
|
Nan::HandleScope scope;
|
||||||
grpc_init();
|
grpc_init();
|
||||||
|
|
@ -306,6 +310,9 @@ void init(Local<Object> exports) {
|
||||||
Nan::Set(exports, Nan::New("setLogVerbosity").ToLocalChecked(),
|
Nan::Set(exports, Nan::New("setLogVerbosity").ToLocalChecked(),
|
||||||
Nan::GetFunction(Nan::New<FunctionTemplate>(SetLogVerbosity))
|
Nan::GetFunction(Nan::New<FunctionTemplate>(SetLogVerbosity))
|
||||||
.ToLocalChecked());
|
.ToLocalChecked());
|
||||||
|
Nan::Set(exports, Nan::New("forcePoll").ToLocalChecked(),
|
||||||
|
Nan::GetFunction(Nan::New<FunctionTemplate>(ForcePoll))
|
||||||
|
.ToLocalChecked());
|
||||||
}
|
}
|
||||||
|
|
||||||
NODE_MODULE(grpc_node, init)
|
NODE_MODULE(grpc_node, init)
|
||||||
|
|
|
||||||
|
|
@ -800,7 +800,10 @@ Client.prototype.waitForReady = function(deadline, callback) {
|
||||||
self.$channel.watchConnectivityState(new_state, deadline, checkState);
|
self.$channel.watchConnectivityState(new_state, deadline, checkState);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
checkState();
|
/* Force a single round of polling to ensure that the channel state is up
|
||||||
|
* to date */
|
||||||
|
grpc.forcePoll();
|
||||||
|
setImmediate(checkState);
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue