diff --git a/rpc/amqp-rpc.go b/rpc/amqp-rpc.go index 02327b758..4fdccd97c 100644 --- a/rpc/amqp-rpc.go +++ b/rpc/amqp-rpc.go @@ -562,12 +562,14 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats responseChan, present := rpc.pending[corrID] rpc.mu.RUnlock() - rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID)) if !present { - // AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e - rpc.log.Audit(fmt.Sprintf(" [c<][%s] Misrouted message: %s - %s - %s", clientQueue, msg.Type, core.B64enc(msg.Body), msg.CorrelationId)) + // occurs when a request is timed out and the arrives + // afterwards + stats.Inc("RPC.AfterTimeoutResponseArrivals."+clientQueuePrefix, 1, 1.0) continue } + + rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID)) responseChan <- msg.Body rpc.mu.Lock() delete(rpc.pending, corrID) @@ -593,10 +595,10 @@ func (rpc *AmqpRPCCLient) SetTimeout(ttl time.Duration) { rpc.timeout = ttl } -// dispatch sends a body to the destination, and returns a response channel -// that can be used to monitor for responses, or discarded for one-shot -// actions. -func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) chan []byte { +// dispatch sends a body to the destination, and returns the id for the request +// that can be used to correlate it with responses, and a response channel that +// can be used to monitor for responses, or discarded for one-shot actions. +func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) (string, chan []byte) { // Create a channel on which to direct the response // At least in some cases, it's important that this channel // be buffered to avoid deadlock @@ -616,7 +618,7 @@ func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) chan []byte { method, body) - return responseChan + return corrID, responseChan } // DispatchSync sends a body to the destination, and blocks waiting on a response. @@ -626,8 +628,9 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b rpc.stats.GaugeDelta("RPC.CallsWaiting", 1, 1.0) defer rpc.stats.GaugeDelta("RPC.CallsWaiting", -1, 1.0) callStarted := time.Now() + corrID, responseChan := rpc.dispatch(method, body) select { - case jsonResponse := <-rpc.dispatch(method, body): + case jsonResponse := <-responseChan: var rpcResponse rpcResponse err = json.Unmarshal(jsonResponse, &rpcResponse) if err != nil { @@ -646,6 +649,9 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b rpc.stats.TimingDuration(fmt.Sprintf("RPC.Latency.%s.Timeout", method), time.Since(callStarted), 1.0) rpc.stats.Inc("RPC.Rate.Timeouts", 1, 1.0) rpc.log.Warning(fmt.Sprintf(" [c!][%s] AMQP-RPC timeout [%s]", rpc.clientQueue, method)) + rpc.mu.Lock() + delete(rpc.pending, corrID) + rpc.mu.Unlock() err = errors.New("AMQP-RPC timeout") return }