RPC debug logs don't use base64.

This decreases log size 27% in a simple test, and more importantly makes the
logs human readable and searchable.
Fixes #1031

Also, change corrID to an 8-byte value and break dependency on NewToken().
Fixes #909
This commit is contained in:
Jacob Hoffman-Andrews 2015-11-19 17:59:09 -08:00
parent 584dae7437
commit bdd3247f47
1 changed files with 67 additions and 42 deletions

View File

@ -9,6 +9,7 @@ import (
"crypto/rand"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
@ -219,79 +220,101 @@ type rpcError struct {
// Wraps a error in a rpcError so it can be marshalled to
// JSON.
func wrapError(err error) (rpcError rpcError) {
func wrapError(err error) *rpcError {
if err != nil {
rpcError.Value = err.Error()
wrapped := &rpcError{
Value: err.Error(),
}
switch err.(type) {
case core.InternalServerError:
rpcError.Type = "InternalServerError"
wrapped.Type = "InternalServerError"
case core.NotSupportedError:
rpcError.Type = "NotSupportedError"
wrapped.Type = "NotSupportedError"
case core.MalformedRequestError:
rpcError.Type = "MalformedRequestError"
wrapped.Type = "MalformedRequestError"
case core.UnauthorizedError:
rpcError.Type = "UnauthorizedError"
wrapped.Type = "UnauthorizedError"
case core.NotFoundError:
rpcError.Type = "NotFoundError"
wrapped.Type = "NotFoundError"
case core.SyntaxError:
rpcError.Type = "SyntaxError"
wrapped.Type = "SyntaxError"
case core.SignatureValidationError:
rpcError.Type = "SignatureValidationError"
wrapped.Type = "SignatureValidationError"
case core.CertificateIssuanceError:
rpcError.Type = "CertificateIssuanceError"
wrapped.Type = "CertificateIssuanceError"
case core.NoSuchRegistrationError:
rpcError.Type = "NoSuchRegistrationError"
wrapped.Type = "NoSuchRegistrationError"
case core.TooManyRPCRequestsError:
rpcError.Type = "TooManyRPCRequestsError"
wrapped.Type = "TooManyRPCRequestsError"
case core.RateLimitedError:
rpcError.Type = "RateLimitedError"
wrapped.Type = "RateLimitedError"
case core.ServiceUnavailableError:
rpcError.Type = "ServiceUnavailableError"
wrapped.Type = "ServiceUnavailableError"
}
return wrapped
}
return
return nil
}
// Unwraps a rpcError and returns the correct error type.
func unwrapError(rpcError rpcError) (err error) {
if rpcError.Value != "" {
func unwrapError(rpcError *rpcError) error {
if rpcError != nil {
switch rpcError.Type {
case "InternalServerError":
err = core.InternalServerError(rpcError.Value)
return core.InternalServerError(rpcError.Value)
case "NotSupportedError":
err = core.NotSupportedError(rpcError.Value)
return core.NotSupportedError(rpcError.Value)
case "MalformedRequestError":
err = core.MalformedRequestError(rpcError.Value)
return core.MalformedRequestError(rpcError.Value)
case "UnauthorizedError":
err = core.UnauthorizedError(rpcError.Value)
return core.UnauthorizedError(rpcError.Value)
case "NotFoundError":
err = core.NotFoundError(rpcError.Value)
return core.NotFoundError(rpcError.Value)
case "SyntaxError":
err = core.SyntaxError(rpcError.Value)
return core.SyntaxError(rpcError.Value)
case "SignatureValidationError":
err = core.SignatureValidationError(rpcError.Value)
return core.SignatureValidationError(rpcError.Value)
case "CertificateIssuanceError":
err = core.CertificateIssuanceError(rpcError.Value)
return core.CertificateIssuanceError(rpcError.Value)
case "NoSuchRegistrationError":
err = core.NoSuchRegistrationError(rpcError.Value)
return core.NoSuchRegistrationError(rpcError.Value)
case "TooManyRPCRequestsError":
err = core.TooManyRPCRequestsError(rpcError.Value)
return core.TooManyRPCRequestsError(rpcError.Value)
case "RateLimitedError":
err = core.RateLimitedError(rpcError.Value)
return core.RateLimitedError(rpcError.Value)
case "ServiceUnavailableError":
err = core.ServiceUnavailableError(rpcError.Value)
return core.ServiceUnavailableError(rpcError.Value)
default:
err = errors.New(rpcError.Value)
return errors.New(rpcError.Value)
}
}
return
return nil
}
// rpcResponse is a stuct for wire-representation of response messages
// used by DispatchSync
type rpcResponse struct {
ReturnVal []byte `json:"returnVal,omitempty"`
Error rpcError `json:"error,omitempty"`
ReturnVal []byte `json:"returnVal"`
Error *rpcError `json:"error,omitempty"`
}
// Hack: Some of our RPCs return DER directly. If we log it naively it will
// just be a bunch of numbers. It's easy to detect DER, so we use this function
// before logging to base64-encode anything that looks like DER.
func safeDER(input []byte) string {
if len(input) > 0 && input[0] == 0x30 {
return string(base64.RawStdEncoding.EncodeToString(input))
}
return string(input)
}
// Used for debug logging
func (r rpcResponse) debugString() string {
ret := safeDER(r.ReturnVal)
if r.Error == nil {
return ret
}
return fmt.Sprintf("%s, RPCERR: %s", ret, r.Error)
}
// AmqpChannel sets a AMQP connection up using SSL if configuration is provided
@ -370,10 +393,10 @@ func AmqpChannel(conf cmd.Config) (*amqp.Channel, error) {
func (rpc *AmqpRPCServer) processMessage(msg amqp.Delivery) {
// XXX-JWS: jws.Verify(body)
cb, present := rpc.dispatchTable[msg.Type]
rpc.log.Info(fmt.Sprintf(" [s<][%s][%s] received %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(msg.Body), msg.CorrelationId))
rpc.log.Info(fmt.Sprintf(" [s<][%s][%s] received %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, safeDER(msg.Body), msg.CorrelationId))
if !present {
// AUDIT[ Misrouted Messages ] f523f21f-12d2-4c31-b2eb-ee4b7d96d60e
rpc.log.Audit(fmt.Sprintf(" [s<][%s][%s] Misrouted message: %s - %s - %s", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(msg.Body), msg.CorrelationId))
rpc.log.Audit(fmt.Sprintf(" [s<][%s][%s] Misrouted message: %s - %s - %s", rpc.serverQueue, msg.ReplyTo, msg.Type, safeDER(msg.Body), msg.CorrelationId))
return
}
var response rpcResponse
@ -386,10 +409,7 @@ func (rpc *AmqpRPCServer) processMessage(msg amqp.Delivery) {
rpc.log.Audit(fmt.Sprintf(" [s>][%s][%s] Error condition marshalling RPC response %s [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, msg.CorrelationId))
return
}
if response.Error.Value != "" {
rpc.log.Info(fmt.Sprintf(" [s>][%s][%s] %s failed, replying: %s (%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, response.Error.Value, response.Error.Type, msg.CorrelationId))
}
rpc.log.Debug(fmt.Sprintf(" [s>][%s][%s] replying %s(%s) [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, core.B64enc(jsonResponse), msg.CorrelationId))
rpc.log.Debug(fmt.Sprintf(" [s>][%s][%s] replying %s: %s [%s]", rpc.serverQueue, msg.ReplyTo, msg.Type, response.debugString(), msg.CorrelationId))
rpc.connection.publish(
msg.ReplyTo,
msg.CorrelationId,
@ -584,7 +604,6 @@ func NewAmqpRPCClient(clientQueuePrefix, serverQueue string, c cmd.Config, stats
continue
}
rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s(%s) [%s]", clientQueue, msg.Type, core.B64enc(msg.Body), corrID))
responseChan <- msg.Body
rpc.mu.Lock()
delete(rpc.pending, corrID)
@ -618,13 +637,18 @@ func (rpc *AmqpRPCCLient) dispatch(method string, body []byte) (string, chan []b
// At least in some cases, it's important that this channel
// be buffered to avoid deadlock
responseChan := make(chan []byte, 1)
corrID := core.NewToken()
corrIDBytes := make([]byte, 8)
_, err := rand.Read(corrIDBytes)
if err != nil {
panic("randomness failed")
}
corrID := base64.RawURLEncoding.EncodeToString(corrIDBytes)
rpc.mu.Lock()
rpc.pending[corrID] = responseChan
rpc.mu.Unlock()
// Send the request
rpc.log.Debug(fmt.Sprintf(" [c>][%s] requesting %s(%s) [%s]", rpc.clientQueue, method, core.B64enc(body), corrID))
rpc.log.Debug(fmt.Sprintf(" [c>][%s] requesting %s(%s) [%s]", rpc.clientQueue, method, safeDER(body), corrID))
rpc.connection.publish(
rpc.serverQueue,
corrID,
@ -645,6 +669,7 @@ func (rpc *AmqpRPCCLient) DispatchSync(method string, body []byte) (response []b
case jsonResponse := <-responseChan:
var rpcResponse rpcResponse
err = json.Unmarshal(jsonResponse, &rpcResponse)
rpc.log.Debug(fmt.Sprintf(" [c<][%s] response %s: %s [%s]", rpc.clientQueue, method, rpcResponse.debugString(), corrID))
if err != nil {
return
}