Merge pull request #1051 from letsencrypt/rpc_gc_timeout

remove RPC response chan from pending on timeout
This commit is contained in:
Roland Bracewell Shoemaker 2015-10-28 20:27:10 -07:00
commit 7e372e6918
1 changed files with 15 additions and 9 deletions

View File

@ -562,12 +562,14 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats
responseChan, present := rpc.pending[corrID] responseChan, present := rpc.pending[corrID]
rpc.mu.RUnlock() rpc.mu.RUnlock()
rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID))
if !present { if !present {
// AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e // occurs when a request is timed out and the arrives
rpc.log.Audit(fmt.Sprintf(" [c<][%s] Misrouted message: %s - %s - %s", clientQueue, msg.Type, core.B64enc(msg.Body), msg.CorrelationId)) // afterwards
stats.Inc("RPC.AfterTimeoutResponseArrivals."+clientQueuePrefix, 1, 1.0)
continue continue
} }
rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID))
responseChan <- msg.Body responseChan <- msg.Body
rpc.mu.Lock() rpc.mu.Lock()
delete(rpc.pending, corrID) delete(rpc.pending, corrID)
@ -593,10 +595,10 @@ func (rpc *AmqpRPCCLient) SetTimeout(ttl time.Duration) {
rpc.timeout = ttl rpc.timeout = ttl
} }
// dispatch sends a body to the destination, and returns a response channel // dispatch sends a body to the destination, and returns the id for the request
// that can be used to monitor for responses, or discarded for one-shot // that can be used to correlate it with responses, and a response channel that
// actions. // can be used to monitor for responses, or discarded for one-shot actions.
func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) chan []byte { func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) (string, chan []byte) {
// Create a channel on which to direct the response // Create a channel on which to direct the response
// At least in some cases, it's important that this channel // At least in some cases, it's important that this channel
// be buffered to avoid deadlock // be buffered to avoid deadlock
@ -616,7 +618,7 @@ func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) chan []byte {
method, method,
body) body)
return responseChan return corrID, responseChan
} }
// DispatchSync sends a body to the destination, and blocks waiting on a response. // DispatchSync sends a body to the destination, and blocks waiting on a response.
@ -626,8 +628,9 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b
rpc.stats.GaugeDelta("RPC.CallsWaiting", 1, 1.0) rpc.stats.GaugeDelta("RPC.CallsWaiting", 1, 1.0)
defer rpc.stats.GaugeDelta("RPC.CallsWaiting", -1, 1.0) defer rpc.stats.GaugeDelta("RPC.CallsWaiting", -1, 1.0)
callStarted := time.Now() callStarted := time.Now()
corrID, responseChan := rpc.dispatch(method, body)
select { select {
case jsonResponse := <-rpc.dispatch(method, body): case jsonResponse := <-responseChan:
var rpcResponse rpcResponse var rpcResponse rpcResponse
err = json.Unmarshal(jsonResponse, &rpcResponse) err = json.Unmarshal(jsonResponse, &rpcResponse)
if err != nil { if err != nil {
@ -646,6 +649,9 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b
rpc.stats.TimingDuration(fmt.Sprintf("RPC.Latency.%s.Timeout", method), time.Since(callStarted), 1.0) rpc.stats.TimingDuration(fmt.Sprintf("RPC.Latency.%s.Timeout", method), time.Since(callStarted), 1.0)
rpc.stats.Inc("RPC.Rate.Timeouts", 1, 1.0) rpc.stats.Inc("RPC.Rate.Timeouts", 1, 1.0)
rpc.log.Warning(fmt.Sprintf(" [c!][%s] AMQP-RPC timeout [%s]", rpc.clientQueue, method)) rpc.log.Warning(fmt.Sprintf(" [c!][%s] AMQP-RPC timeout [%s]", rpc.clientQueue, method))
rpc.mu.Lock()
delete(rpc.pending, corrID)
rpc.mu.Unlock()
err = errors.New("AMQP-RPC timeout") err = errors.New("AMQP-RPC timeout")
return return
} }