Add RPC server processing metrics and rename client RPC metrics
This commit is contained in:
		
							parent
							
								
									48bd80d14e
								
							
						
					
					
						commit
						88b8eb3480
					
				|  | @ -439,12 +439,16 @@ func (rpc *AmqpRPCServer) Start(c cmd.Config) error { | |||
| 				rpc.stats.TimingDuration(fmt.Sprintf("RPC.MessageLag.%s", rpc.serverQueue), rpc.clk.Now().Sub(msg.Timestamp), 1.0) | ||||
| 				if rpc.maxConcurrentRPCServerRequests > 0 && atomic.LoadInt64(&rpc.currentGoroutines) >= rpc.maxConcurrentRPCServerRequests { | ||||
| 					rpc.replyTooManyRequests(msg) | ||||
| 					rpc.stats.Inc(fmt.Sprintf("RPC.CallsDropped.%s", rpc.serverQueue), 1, 1.0) | ||||
| 					break // this breaks the select, not the for
 | ||||
| 				} | ||||
| 				rpc.stats.Inc(fmt.Sprintf("RPC.Traffic.Rx.%s", rpc.serverQueue), int64(len(msg.Body)), 1.0) | ||||
| 				go func() { | ||||
| 					atomic.AddInt64(&rpc.currentGoroutines, 1) | ||||
| 					defer atomic.AddInt64(&rpc.currentGoroutines, -1) | ||||
| 					startedProcessing := rpc.clk.Now() | ||||
| 					rpc.processMessage(msg) | ||||
| 					rpc.stats.TimingDuration(fmt.Sprintf("RPC.ServerProcessingLatency.%s", msg.Type), time.Since(startedProcessing), 1.0) | ||||
| 				}() | ||||
| 			} else { | ||||
| 				rpc.mu.RLock() | ||||
|  | @ -634,10 +638,7 @@ func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) (string, chan []b | |||
| 
 | ||||
| // DispatchSync sends a body to the destination, and blocks waiting on a response.
 | ||||
| func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []byte, err error) { | ||||
| 	rpc.stats.Inc(fmt.Sprintf("RPC.Rate.%s", method), 1, 1.0) | ||||
| 	rpc.stats.Inc("RPC.Traffic", int64(len(body)), 1.0) | ||||
| 	rpc.stats.GaugeDelta("RPC.CallsWaiting", 1, 1.0) | ||||
| 	defer rpc.stats.GaugeDelta("RPC.CallsWaiting", -1, 1.0) | ||||
| 	rpc.stats.Inc(fmt.Sprintf("RPC.Traffic.Tx.%s", rpc.serverQueue), int64(len(body)), 1.0) | ||||
| 	callStarted := time.Now() | ||||
| 	corrID, responseChan := rpc.dispatch(method, body) | ||||
| 	select { | ||||
|  | @ -649,15 +650,15 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b | |||
| 		} | ||||
| 		err = unwrapError(rpcResponse.Error) | ||||
| 		if err != nil { | ||||
| 			rpc.stats.Inc(fmt.Sprintf("RPC.Latency.%s.Error", method), 1, 1.0) | ||||
| 			rpc.stats.Inc(fmt.Sprintf("RPC.ClientCallLatency.%s.Error", method), 1, 1.0) | ||||
| 			return | ||||
| 		} | ||||
| 		rpc.stats.Inc("RPC.Rate.Success", 1, 1.0) | ||||
| 		rpc.stats.TimingDuration(fmt.Sprintf("RPC.Latency.%s.Success", method), time.Since(callStarted), 1.0) | ||||
| 		rpc.stats.TimingDuration(fmt.Sprintf("RPC.ClientCallLatency.%s.Success", method), time.Since(callStarted), 1.0) | ||||
| 		response = rpcResponse.ReturnVal | ||||
| 		return | ||||
| 	case <-time.After(rpc.timeout): | ||||
| 		rpc.stats.TimingDuration(fmt.Sprintf("RPC.Latency.%s.Timeout", method), time.Since(callStarted), 1.0) | ||||
| 		rpc.stats.TimingDuration(fmt.Sprintf("RPC.ClientCallLatency.%s.Timeout", method), time.Since(callStarted), 1.0) | ||||
| 		rpc.stats.Inc("RPC.Rate.Timeouts", 1, 1.0) | ||||
| 		rpc.log.Warning(fmt.Sprintf(" [c!][%s] AMQP-RPC timeout [%s]", rpc.clientQueue, method)) | ||||
| 		rpc.mu.Lock() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue