Rework audit logging. Issue #23.

This commit is contained in:
J.C. Jones 2015-03-24 17:11:30 -07:00
parent e1f80a108f
commit 151274f1b3
9 changed files with 165 additions and 526 deletions

View File

@ -6,7 +6,10 @@
package analysisengine
import (
"github.com/letsencrypt/boulder/log"
blog "github.com/letsencrypt/boulder/log"
"encoding/json"
"fmt"
"github.com/streadway/amqp"
)
@ -15,20 +18,27 @@ import (
// Interface all Analysis Engines share
type AnalysisEngine interface {
ProcessMessage(amqp.Delivery)
ProcessMessage(amqp.Delivery) (err error)
}
// An Analysis Engine that just logs to the JSON Logger.
type LoggingAnalysisEngine struct {
jsonLogger *log.JSONLogger
log *blog.AuditLogger
}
func (eng *LoggingAnalysisEngine) ProcessMessage(delivery amqp.Delivery) {
func (eng *LoggingAnalysisEngine) ProcessMessage(delivery amqp.Delivery) (err error) {
// Send the entire message contents to the syslog server for debugging.
eng.jsonLogger.Debug("Message contents", delivery)
encoded, err := json.Marshal(delivery)
if err != nil {
return
}
err = eng.log.Debug(fmt.Sprintf("MONITOR: %s", encoded))
return
}
// Construct a new Analysis Engine.
func NewLoggingAnalysisEngine(logger *log.JSONLogger) AnalysisEngine {
return &LoggingAnalysisEngine{jsonLogger: logger}
func NewLoggingAnalysisEngine(logger *blog.AuditLogger) AnalysisEngine {
return &LoggingAnalysisEngine{log: logger}
}

View File

@ -6,6 +6,7 @@
package analysisengine
import (
"log/syslog"
"testing"
"github.com/letsencrypt/boulder/log"
@ -13,7 +14,8 @@ import (
)
func TestNewLoggingAnalysisEngine(t *testing.T) {
log := log.NewJSONLogger("newEngine")
writer, _ := syslog.New(syslog.LOG_EMERG|syslog.LOG_KERN, "tag")
log, _ := log.NewAuditLogger(writer)
ae := NewLoggingAnalysisEngine(log)
// Trivially check an empty mock message

View File

@ -6,14 +6,14 @@
package main
import (
"log"
"net/url"
"fmt"
"log/syslog"
"os"
"github.com/codegangsta/cli"
"github.com/streadway/amqp"
"github.com/letsencrypt/boulder/analysis"
"github.com/letsencrypt/boulder/cmd"
blog "github.com/letsencrypt/boulder/log"
)
@ -32,24 +32,13 @@ const (
AmqpImmediate = false
)
func startMonitor(AmqpURL string, logger *blog.JSONLogger) {
func startMonitor(rpcCh *amqp.Channel, logger *blog.AuditLogger) {
ae := analysisengine.NewLoggingAnalysisEngine(logger)
// For convenience at the broker, identifiy ourselves by hostname
consumerTag, err := os.Hostname()
if err != nil {
log.Fatalf("Could not determine hostname")
}
conn, err := amqp.Dial(AmqpURL)
if err != nil {
log.Fatalf("Could not connect to AMQP server: %s", err)
}
rpcCh, err := conn.Channel()
if err != nil {
log.Fatalf("Could not start channel: %s", err)
cmd.FailOnError(err, "Could not determine hostname")
}
err = rpcCh.ExchangeDeclare(
@ -61,7 +50,7 @@ func startMonitor(AmqpURL string, logger *blog.JSONLogger) {
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare exchange: %s", err)
cmd.FailOnError(err, "Could not declare exchange")
}
_, err = rpcCh.QueueDeclare(
@ -72,7 +61,7 @@ func startMonitor(AmqpURL string, logger *blog.JSONLogger) {
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare queue: %s", err)
cmd.FailOnError(err, "Could not declare queue")
}
err = rpcCh.QueueBind(
@ -82,7 +71,7 @@ func startMonitor(AmqpURL string, logger *blog.JSONLogger) {
false,
nil)
if err != nil {
log.Fatalf("Could not bind queue: %s", err)
cmd.FailOnError(err, "Could not bind queue")
}
deliveries, err := rpcCh.Consume(
@ -94,76 +83,37 @@ func startMonitor(AmqpURL string, logger *blog.JSONLogger) {
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not subscribe to queue: %s", err)
cmd.FailOnError(err, "Could not subscribe to queue")
}
// Run forever.
for d := range deliveries {
// Pass each message to the Analysis Engine
ae.ProcessMessage(d)
// Only ack the delivery we actually handled (ackMultiple=false)
const ackMultiple = false
d.Ack(ackMultiple)
err = ae.ProcessMessage(d)
if err != nil {
logger.Alert(fmt.Sprintf("Could not process message: %s", err))
} else {
// Only ack the delivery we actually handled (ackMultiple=false)
const ackMultiple = false
d.Ack(ackMultiple)
}
}
}
func main() {
app := cli.NewApp()
app.Name = "activity-monitor"
app.Usage = "Monitor Boulder's communications."
app.Version = "0.0.0"
app := cmd.NewAppShell("activity-monitor")
// Specify AMQP Server
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "amqp",
Value: "amqp://guest:guest@localhost:5672",
Usage: "AMQP Broker String",
},
cli.StringFlag{
Name: "jsonlog",
Usage: "JSON logging server and port (e.g., tcp://localhost:515)",
},
cli.BoolFlag{
Name: "stdout",
Usage: "Enable debug logging to stdout",
},
cli.IntFlag{
Name: "level",
Value: 4,
Usage: "Minimum Level to log (0-7), 7=Debug",
},
app.Action = func(c cmd.Config) {
syslogger, err := syslog.Dial(c.Syslog.Network, c.Syslog.Server, syslog.LOG_LOCAL0, c.Syslog.Tag)
auditlogger := blog.NewAuditLogger(syslogger)
cmd.FailOnError(err, "Could not connect to Syslog")
ch := cmd.AmqpChannel(c.AMQP.Server)
startMonitor(ch, auditlogger)
}
app.Action = func(c *cli.Context) {
logger := blog.NewJSONLogger("am")
// Parse SysLog URL if one was provided
if c.GlobalString("jsonlog") == "" {
log.Println("No external logging server; defaulting to stdout.")
logger.EnableStdOut(true)
} else {
syslogU, err := url.Parse(c.GlobalString("jsonlog"))
if err != nil {
log.Fatalf("Could not parse Syslog URL: %s", err)
}
logger.SetEndpoint(syslogU.Scheme, syslogU.Host)
err = logger.Connect()
if err != nil {
log.Fatalf("Could not open remote syslog: %s", err)
}
logger.EnableStdOut(c.GlobalBool("stdout"))
}
logger.SetLevel(c.GlobalInt("level"))
startMonitor(c.GlobalString("amqp"), logger)
}
if err := app.Run(os.Args); err != nil {
log.Fatalf("Could not start: %s", err)
}
app.Run()
}

View File

@ -62,6 +62,12 @@ type Config struct {
DBDriver string
DBName string
}
Syslog struct {
Network string
Server string
Tag string
}
}
type QueuePair struct {
@ -99,6 +105,7 @@ func (as *AppShell) Run() {
var config Config
err = json.Unmarshal(configJSON, &config)
FailOnError(err, "Failed to read configuration")
as.Action(config)
}

40
log/audit-logger.go Normal file
View File

@ -0,0 +1,40 @@
// Copyright 2015 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package log
import (
"errors"
"fmt"
"log/syslog"
)
// The constant used to identify audit-specific messages
const auditTag = "[AUDIT]"
// AuditLogger is a System Logger with additional audit-specific methods.
// In addition to all the standard syslog.Writer methods from
// http://golang.org/pkg/log/syslog/#Writer, you can also call
// auditLogger.Audit(msg string)
// to send a message as an audit event.
type AuditLogger struct {
*syslog.Writer
}
// NewAuditLogger constructs an Audit Logger that decorates a normal
// System Logger. All methods in log/syslog continue to work.
func NewAuditLogger(log *syslog.Writer) (*AuditLogger, error) {
if log == nil {
return nil, errors.New("Attempted to use a nil System Logger.")
}
return &AuditLogger{log}, nil
}
// Audit sends a NOTICE-severity message that is prefixed with the
// audit tag, for special handling at the upstream system logger.
func (log *AuditLogger) Audit(msg string) (err error) {
err = log.Notice(fmt.Sprintf("%s %s", auditTag, msg))
return
}

63
log/audit-logger_test.go Normal file
View File

@ -0,0 +1,63 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package log
import (
"log/syslog"
"testing"
"github.com/letsencrypt/boulder/test"
)
func TestConstruction(t *testing.T) {
writer, err := syslog.New(syslog.LOG_EMERG|syslog.LOG_KERN, "tag")
test.AssertNotError(t, err, "Could not construct syslog object")
_, err = NewAuditLogger(writer)
test.AssertNotError(t, err, "Could not construct audit logger")
}
func TestConstructionNil(t *testing.T) {
_, err := NewAuditLogger(nil)
test.AssertError(t, err, "Nil shouldn't be permitted.")
}
func TestEmit(t *testing.T) {
writer, err := syslog.New(syslog.LOG_EMERG|syslog.LOG_KERN, "tag")
test.AssertNotError(t, err, "Could not construct syslog object")
audit, err := NewAuditLogger(writer)
test.AssertNotError(t, err, "Could not construct audit logger")
audit.Audit("test message")
}
func TestEmitEmpty(t *testing.T) {
writer, err := syslog.New(syslog.LOG_EMERG|syslog.LOG_KERN, "tag")
test.AssertNotError(t, err, "Could not construct syslog object")
audit, err := NewAuditLogger(writer)
test.AssertNotError(t, err, "Could not construct audit logger")
audit.Audit("")
}
func TestSyslogMethods(t *testing.T) {
writer, err := syslog.New(syslog.LOG_EMERG|syslog.LOG_KERN, "tag")
test.AssertNotError(t, err, "Could not construct syslog object")
audit, err := NewAuditLogger(writer)
test.AssertNotError(t, err, "Could not construct audit logger")
audit.Audit("audit-logger_test.go: audit-notice")
audit.Crit("audit-logger_test.go: critical")
audit.Debug("audit-logger_test.go: debug")
audit.Emerg("audit-logger_test.go: emerg")
audit.Err("audit-logger_test.go: err")
audit.Info("audit-logger_test.go: info")
audit.Notice("audit-logger_test.go: notice")
audit.Warning("audit-logger_test.go: warning")
}

View File

@ -1,188 +0,0 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package log
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net"
"sync"
)
// This package transmits JSON data over IP. It is designed to follow
// general conventions for syslog, but uses JSON encoding instead of
// the RFC 5424 strings.
//
// The JSON encoding is suitable for import by Logstash's "json_lines" CODEC
// module.
//
// NOTE: In TCP mode, this package attempts to retransmit in the event of a
// channel failure. If the retransmission fails, it aborts and expects to
// be restarted by the process controller.
// Use the same Severity levels as RFC 5424.
// Note: RFC 5424 Facility is not used.
const (
EMERGENCY = 0
ALERT = 1
CRITICAL = 2
ERROR = 3
WARNING = 4
NOTICE = 5
INFO = 6
DEBUG = 7
)
// JSON Schema for the Log Messages on the wire.
type LogMessage struct {
// User-readable descriptive message; may be null.
Message string `json:"message"`
// Sub-object; must be JSON-formattable.
Payload interface{} `json:"payload"`
// Logger identifier
Program string `json:"program"`
// RFC 5424 severity level
Severity int `json:"severity"`
}
// Structure to hold logger details.
type JSONLogger struct {
stdout bool // True if logging to stdout (independent of network)
online bool // True if logging to network
scheme string // Golang net URI scheme (tcp/udp)
host string // "address:port"
level int // Maximum-transmitted log level
conn net.Conn // Socket representation
mu sync.Mutex // guards conn
program string // Defines the 'program' field in JSON
}
func NewJSONLogger(programName string) *JSONLogger {
return &JSONLogger{
program: programName,
level: 7, // Default to all
}
}
func (jl *JSONLogger) EnableStdOut(stdout bool) {
jl.stdout = stdout
}
func (jl *JSONLogger) SetLevel(level int) {
jl.level = level
}
func (jl *JSONLogger) SetEndpoint(scheme string, host string) {
jl.scheme = scheme
jl.host = host
jl.online = true
}
func (jl *JSONLogger) Connect() error {
conn, err := net.Dial(jl.scheme, jl.host)
if err == nil {
jl.conn = conn
}
return err
}
// Log at the Critical severity level.
func (jl *JSONLogger) Critical(messageStr string, payloadObj interface{}) {
jl.Write(CRITICAL, messageStr, payloadObj)
}
// Log at the Alert severity level.
func (jl *JSONLogger) Alert(messageStr string, payloadObj interface{}) {
jl.Write(ALERT, messageStr, payloadObj)
}
// Log at the Emergency severity level.
func (jl *JSONLogger) Emergency(messageStr string, payloadObj interface{}) {
jl.Write(EMERGENCY, messageStr, payloadObj)
}
// Log at the Error severity level.
func (jl *JSONLogger) Error(messageStr string, payloadObj interface{}) {
jl.Write(ERROR, messageStr, payloadObj)
}
// Log at the Warning severity level.
func (jl *JSONLogger) Warning(messageStr string, payloadObj interface{}) {
jl.Write(WARNING, messageStr, payloadObj)
}
// Log at the Notice severity level.
func (jl *JSONLogger) Notice(messageStr string, payloadObj interface{}) {
jl.Write(NOTICE, messageStr, payloadObj)
}
// Log at the Info severity level.
func (jl *JSONLogger) Info(messageStr string, payloadObj interface{}) {
jl.Write(INFO, messageStr, payloadObj)
}
// Log at the Debug severity level.
func (jl *JSONLogger) Debug(messageStr string, payloadObj interface{}) {
jl.Write(DEBUG, messageStr, payloadObj)
}
// Combines a message, payload, and severity to a LogMessage struct and
// serializes it to the wire. If the send via WriteAndRetry() fails, this method
// calls log.Fatalf() which will abort the program, leaving the system to restart
// the process.
func (jl *JSONLogger) Write(severity int, messageStr string, payloadObj interface{}) {
if severity > jl.level {
return
}
data := LogMessage{
Program: jl.program,
Payload: payloadObj,
Message: messageStr,
Severity: severity}
encoded, err := json.Marshal(data)
if err != nil {
log.Printf("Could not marshal log message: %s\n", err)
return
}
buf := bytes.NewBuffer(encoded)
buf.WriteByte('\n') // Append a newline
if jl.stdout {
log.Println(fmt.Sprintf("<%d> %s", severity, buf.String()))
}
if jl.online {
// If we've been told to be connected, write to the socket.
_, err = jl.WriteAndRetry(buf.Bytes())
if err != nil {
log.Fatalf("Failed to send log message, even with retry, exiting: %s\n", buf.String())
}
}
}
// Send the provided data on the connection; if there is an error,
// it will retry to connect and transmit again, once. If that fails,
// it returns an error.
func (jl *JSONLogger) WriteAndRetry(data []byte) (int, error) {
jl.mu.Lock()
defer jl.mu.Unlock()
if jl.conn != nil {
if n, err := jl.conn.Write(data); err == nil {
return n, err
}
}
if err := jl.Connect(); err != nil {
return 0, err
}
return jl.conn.Write(data)
}

View File

@ -1,251 +0,0 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package log
import (
"bufio"
"fmt"
"net"
"strings"
"testing"
"time"
"github.com/letsencrypt/boulder/test"
)
const TimeoutIndicator = "<TIMEOUT>"
func readChanWithTimeout(outChan <-chan string) string {
timeout := time.After(time.Second)
select {
case line := <-outChan:
return line
case <-timeout:
return TimeoutIndicator
}
}
func awaitMessage(t *testing.T, scheme string, address string) (net.Listener, <-chan string) {
outChan := make(chan string)
socket, err := net.Listen(scheme, address)
test.AssertNotError(t, err, "Could not listen")
recvLog := func() {
conn, err := socket.Accept()
if conn == nil {
t.Error("Conn nil; programmer error in test.")
return
}
defer func() {
conn.Close()
fmt.Println("Exiting")
}()
test.AssertNotError(t, err, "Could not accept")
reader := bufio.NewReader(conn)
for {
conn.SetDeadline(time.Now().Add(time.Second))
line, _ := reader.ReadString('\n')
// Emit the line if it's not-empty.
if line != "" {
outChan <- line
}
}
}
go recvLog()
// Let the caller close the socket
return socket, outChan
}
func TestWriteTcp(t *testing.T) {
const Scheme = "tcp"
const Address = "127.0.0.1:9999"
socket, outChan := awaitMessage(t, Scheme, Address)
defer socket.Close()
log := NewJSONLogger("just a test")
log.SetEndpoint(Scheme, Address)
msg := "Test " + Scheme + " " + Address
log.Critical(msg, nil)
rsp := <-outChan
test.AssertContains(t, rsp, msg)
test.AssertSeverity(t, rsp, CRITICAL)
}
func TestWriteNoNetwork(t *testing.T) {
log := NewJSONLogger("just a test")
log.Debug("Check", nil)
// Nothing to assert
log.EnableStdOut(true)
log.Debug("Check", nil)
// Nothing to assert
}
func TestWriteUnMarshallable(t *testing.T) {
const Scheme = "tcp"
const Address = "127.0.0.1:9998"
socket, outChan := awaitMessage(t, Scheme, Address)
defer socket.Close()
log := NewJSONLogger("please don't work")
log.SetEndpoint(Scheme, Address)
log.Connect()
log.Debug("Check", func() {})
rsp := readChanWithTimeout(outChan)
test.AssertEquals(t, rsp, TimeoutIndicator)
}
func TestWriteTcpAllLevels(t *testing.T) {
const Scheme = "tcp"
const Address = "127.0.0.1:9997"
socket, outChan := awaitMessage(t, Scheme, Address)
defer socket.Close()
log := NewJSONLogger("just a test")
log.SetEndpoint(Scheme, Address)
msg := "Test " + Scheme + " " + Address
{
log.Critical(msg, msg)
rsp := <-outChan
test.AssertSeverity(t, rsp, CRITICAL)
test.AssertContains(t, rsp, msg)
}
{
log.Alert(msg, msg)
rsp := <-outChan
test.AssertSeverity(t, rsp, ALERT)
test.AssertContains(t, rsp, msg)
}
{
log.Emergency(msg, msg)
rsp := <-outChan
test.AssertSeverity(t, rsp, EMERGENCY)
test.AssertContains(t, rsp, msg)
}
{
log.Error(msg, msg)
rsp := <-outChan
test.AssertSeverity(t, rsp, ERROR)
test.AssertContains(t, rsp, msg)
}
{
log.Warning(msg, msg)
rsp := <-outChan
test.AssertSeverity(t, rsp, WARNING)
test.AssertContains(t, rsp, msg)
}
{
log.Notice(msg, msg)
rsp := <-outChan
test.AssertSeverity(t, rsp, NOTICE)
test.AssertContains(t, rsp, msg)
}
{
log.Info(msg, msg)
rsp := <-outChan
test.AssertSeverity(t, rsp, INFO)
test.AssertContains(t, rsp, msg)
}
{
log.Debug(msg, msg)
rsp := <-outChan
test.AssertSeverity(t, rsp, DEBUG)
test.AssertContains(t, rsp, msg)
}
}
func TestLevelMasking(t *testing.T) {
const Scheme = "tcp"
const Address = "127.0.0.1:9996"
socket, outChan := awaitMessage(t, Scheme, Address)
defer socket.Close()
log := NewJSONLogger("just a test")
log.SetEndpoint(Scheme, Address)
msg := "Test " + Scheme + " " + Address
{
log.Info(msg, msg)
rsp := readChanWithTimeout(outChan)
test.AssertSeverity(t, rsp, INFO)
test.AssertContains(t, rsp, msg)
}
// Notice and lower numbers should emit; Info should not.
log.SetLevel(NOTICE)
{
log.Info(msg, msg)
rsp := readChanWithTimeout(outChan)
test.AssertEquals(t, rsp, TimeoutIndicator)
}
// Warning, being lower than Notice, should emit.
{
log.Warning(msg, msg)
rsp := readChanWithTimeout(outChan)
test.AssertSeverity(t, rsp, WARNING)
test.AssertContains(t, rsp, msg)
}
}
func TestEmbeddedNewline(t *testing.T) {
const Scheme = "tcp"
const Address = "127.0.0.1:9995"
socket, outChan := awaitMessage(t, Scheme, Address)
defer socket.Close()
log := NewJSONLogger("embedded newline")
log.SetEndpoint(Scheme, Address)
payload := struct {
One string
Two string
}{
One: "A\nTOYOTA'S\nA\nTOYOTA",
Two: "\n\n\n\n\n",
}
msg := "There's a newline in the payload:"
log.Critical(msg, payload)
rsp := <-outChan
test.AssertContains(t, rsp, msg)
test.AssertSeverity(t, rsp, CRITICAL)
// I can't do an test.AssertContains directly because rsp is escaped, while the
// payload values are not. Since escaping routines are not so easy to find,
// payload I can't just JSON-marshal (because that is a loopback test),
// I do it manually.
test.AssertContains(t, rsp, strings.Replace(payload.One, "\n", "\\n", -1))
test.AssertContains(t, rsp, strings.Replace(payload.Two, "\n", "\\n", -1))
}

View File

@ -23,6 +23,12 @@ func AssertNotError(t *testing.T, err error, message string) {
}
}
func AssertError(t *testing.T, err error, message string) {
if err == nil {
t.Error(message, err)
}
}
func AssertEquals(t *testing.T, one string, two string) {
if one != two {
t.Errorf("String [%s] != [%s]", one, two)