diff --git a/contrib/completion/bash/docker b/contrib/completion/bash/docker index 447a75e03b..4b8d9bd209 100644 --- a/contrib/completion/bash/docker +++ b/contrib/completion/bash/docker @@ -279,6 +279,7 @@ __docker_log_driver_options() { local gelf_options="gelf-address gelf-tag" local json_file_options="max-file max-size" local syslog_options="syslog-address syslog-facility syslog-tag" + local awslogs_options="awslogs-region awslogs-group awslogs-stream" case $(__docker_value_of_option --log-driver) in '') @@ -296,6 +297,9 @@ __docker_log_driver_options() { syslog) COMPREPLY=( $( compgen -W "$syslog_options" -S = -- "$cur" ) ) ;; + awslogs) + COMPREPLY=( $( compgen -W "$awslogs_options" -S = -- "$cur" ) ) + ;; *) return ;; diff --git a/contrib/completion/zsh/_docker b/contrib/completion/zsh/_docker index 964cf6fda6..fc894c84b5 100644 --- a/contrib/completion/zsh/_docker +++ b/contrib/completion/zsh/_docker @@ -238,7 +238,7 @@ __docker_subcommand() { "($help)--ipc=-[IPC namespace to use]:IPC namespace: " "($help)*--link=-[Add link to another container]:link:->link" "($help)*"{-l,--label=-}"[Set meta data on a container]:label: " - "($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd none)" + "($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd awslogs none)" "($help)*--log-opt=-[Log driver specific options]:log driver options: " "($help)*--lxc-conf=-[Add custom lxc options]:lxc options: " "($help)--mac-address=-[Container MAC address]:MAC address: " @@ -617,7 +617,7 @@ _docker() { "($help)--ipv6[Enable IPv6 networking]" \ "($help -l --log-level)"{-l,--log-level=-}"[Set the logging level]:level:(debug info warn error fatal)" \ "($help)*--label=-[Set key=value labels to the daemon]:label: " \ - "($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd none)" \ + "($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd awslogs none)" \ "($help)*--log-opt=-[Log driver specific options]:log driver options: " \ "($help)--mtu=-[Set the containers network MTU]:mtu:(0 576 1420 1500 9000)" \ "($help -p --pidfile)"{-p,--pidfile=-}"[Path to use for daemon PID file]:PID file:_files" \ diff --git a/daemon/logdrivers_linux.go b/daemon/logdrivers_linux.go index 3ff518fccc..1d51a4a09e 100644 --- a/daemon/logdrivers_linux.go +++ b/daemon/logdrivers_linux.go @@ -3,6 +3,7 @@ package daemon import ( // Importing packages here only to make sure their init gets called and // therefore they register themselves to the logdriver factory. + _ "github.com/docker/docker/daemon/logger/awslogs" _ "github.com/docker/docker/daemon/logger/fluentd" _ "github.com/docker/docker/daemon/logger/gelf" _ "github.com/docker/docker/daemon/logger/journald" diff --git a/daemon/logdrivers_windows.go b/daemon/logdrivers_windows.go index df3bd1f0bf..3b4a1f3ac3 100644 --- a/daemon/logdrivers_windows.go +++ b/daemon/logdrivers_windows.go @@ -3,5 +3,6 @@ package daemon import ( // Importing packages here only to make sure their init gets called and // therefore they register themselves to the logdriver factory. + _ "github.com/docker/docker/daemon/logger/awslogs" _ "github.com/docker/docker/daemon/logger/jsonfilelog" ) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go new file mode 100644 index 0000000000..f87c513362 --- /dev/null +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -0,0 +1,326 @@ +// Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs +package awslogs + +import ( + "fmt" + "os" + "sort" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/docker/docker/daemon/logger" + "github.com/docker/docker/vendor/src/github.com/Sirupsen/logrus" +) + +const ( + name = "awslogs" + regionKey = "awslogs-region" + regionEnvKey = "AWS_REGION" + logGroupKey = "awslogs-group" + logStreamKey = "awslogs-stream" + batchPublishFrequency = 5 * time.Second + + // See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html + perEventBytes = 26 + maximumBytesPerPut = 1048576 + maximumLogEventsPerPut = 10000 + + // See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html + maximumBytesPerEvent = 262144 - perEventBytes + + resourceAlreadyExistsCode = "ResourceAlreadyExistsException" + dataAlreadyAcceptedCode = "DataAlreadyAcceptedException" + invalidSequenceTokenCode = "InvalidSequenceTokenException" +) + +type logStream struct { + logStreamName string + logGroupName string + client api + messages chan *logger.Message + lock sync.RWMutex + closed bool + sequenceToken *string +} + +type api interface { + CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) + PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) +} + +type byTimestamp []*cloudwatchlogs.InputLogEvent + +// init registers the awslogs driver and sets the default region, if provided +func init() { + if os.Getenv(regionEnvKey) != "" { + aws.DefaultConfig.Region = aws.String(os.Getenv(regionEnvKey)) + } + if err := logger.RegisterLogDriver(name, New); err != nil { + logrus.Fatal(err) + } + if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil { + logrus.Fatal(err) + } +} + +// New creates an awslogs logger using the configuration passed in on the +// context. Supported context configuration variables are awslogs-region, +// awslogs-group, and awslogs-stream. When available, configuration is +// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID, +// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and +// the EC2 Instance Metadata Service. +func New(ctx logger.Context) (logger.Logger, error) { + logGroupName := ctx.Config[logGroupKey] + logStreamName := ctx.ContainerID + if ctx.Config[logStreamKey] != "" { + logStreamName = ctx.Config[logStreamKey] + } + config := aws.DefaultConfig + if ctx.Config[regionKey] != "" { + config = aws.DefaultConfig.Merge(&aws.Config{ + Region: aws.String(ctx.Config[regionKey]), + }) + } + containerStream := &logStream{ + logStreamName: logStreamName, + logGroupName: logGroupName, + client: cloudwatchlogs.New(config), + messages: make(chan *logger.Message, 4096), + } + err := containerStream.create() + if err != nil { + return nil, err + } + go containerStream.collectBatch() + + return containerStream, nil +} + +// Name returns the name of the awslogs logging driver +func (l *logStream) Name() string { + return name +} + +// Log submits messages for logging by an instance of the awslogs logging driver +func (l *logStream) Log(msg *logger.Message) error { + l.lock.RLock() + defer l.lock.RUnlock() + if !l.closed { + l.messages <- msg + } + return nil +} + +// Close closes the instance of the awslogs logging driver +func (l *logStream) Close() error { + l.lock.Lock() + defer l.lock.Unlock() + if !l.closed { + close(l.messages) + } + l.closed = true + return nil +} + +// create creates a log stream for the instance of the awslogs logging driver +func (l *logStream) create() error { + input := &cloudwatchlogs.CreateLogStreamInput{ + LogGroupName: aws.String(l.logGroupName), + LogStreamName: aws.String(l.logStreamName), + } + + _, err := l.client.CreateLogStream(input) + + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + fields := logrus.Fields{ + "errorCode": awsErr.Code(), + "message": awsErr.Message(), + "origError": awsErr.OrigErr(), + "logGroupName": l.logGroupName, + "logStreamName": l.logStreamName, + } + if awsErr.Code() == resourceAlreadyExistsCode { + // Allow creation to succeed + logrus.WithFields(fields).Info("Log stream already exists") + return nil + } + logrus.WithFields(fields).Error("Failed to create log stream") + } + } + return err +} + +// newTicker is used for time-based batching. newTicker is a variable such +// that the implementation can be swapped out for unit tests. +var newTicker = func(freq time.Duration) *time.Ticker { + return time.NewTicker(freq) +} + +// collectBatch executes as a goroutine to perform batching of log events for +// submission to the log stream. Batching is performed on time- and size- +// bases. Time-based batching occurs at a 5 second interval (defined in the +// batchPublishFrequency const). Size-based batching is performed on the +// maximum number of events per batch (defined in maximumLogEventsPerPut) and +// the maximum number of total bytes in a batch (defined in +// maximumBytesPerPut). Log messages are split by the maximum bytes per event +// (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead +// (defined in perEventBytes) which is accounted for in split- and batch- +// calculations. +func (l *logStream) collectBatch() { + timer := newTicker(batchPublishFrequency) + var events []*cloudwatchlogs.InputLogEvent + bytes := 0 + for { + select { + case <-timer.C: + l.publishBatch(events) + events = events[:0] + bytes = 0 + case msg, more := <-l.messages: + if !more { + l.publishBatch(events) + return + } + unprocessedLine := msg.Line + for len(unprocessedLine) > 0 { + // Split line length so it does not exceed the maximum + lineBytes := len(unprocessedLine) + if lineBytes > maximumBytesPerEvent { + lineBytes = maximumBytesPerEvent + } + line := unprocessedLine[:lineBytes] + unprocessedLine = unprocessedLine[lineBytes:] + if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) { + // Publish an existing batch if it's already over the maximum number of events or if adding this + // event would push it over the maximum number of total bytes. + l.publishBatch(events) + events = events[:0] + bytes = 0 + } + events = append(events, &cloudwatchlogs.InputLogEvent{ + Message: aws.String(string(line)), + Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)), + }) + bytes += (lineBytes + perEventBytes) + } + } + } +} + +// publishBatch calls PutLogEvents for a given set of InputLogEvents, +// accounting for sequencing requirements (each request must reference the +// sequence token returned by the previous request). +func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) { + if len(events) == 0 { + return + } + + sort.Sort(byTimestamp(events)) + + nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken) + + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == dataAlreadyAcceptedCode { + // already submitted, just grab the correct sequence token + parts := strings.Split(awsErr.Message(), " ") + nextSequenceToken = &parts[len(parts)-1] + logrus.WithFields(logrus.Fields{ + "errorCode": awsErr.Code(), + "message": awsErr.Message(), + "logGroupName": l.logGroupName, + "logStreamName": l.logStreamName, + }).Info("Data already accepted, ignoring error") + err = nil + } else if awsErr.Code() == invalidSequenceTokenCode { + // sequence code is bad, grab the correct one and retry + parts := strings.Split(awsErr.Message(), " ") + token := parts[len(parts)-1] + nextSequenceToken, err = l.putLogEvents(events, &token) + } + } + } + if err != nil { + logrus.Error(err) + } else { + l.sequenceToken = nextSequenceToken + } +} + +// putLogEvents wraps the PutLogEvents API +func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) { + input := &cloudwatchlogs.PutLogEventsInput{ + LogEvents: events, + SequenceToken: sequenceToken, + LogGroupName: aws.String(l.logGroupName), + LogStreamName: aws.String(l.logStreamName), + } + resp, err := l.client.PutLogEvents(input) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + logrus.WithFields(logrus.Fields{ + "errorCode": awsErr.Code(), + "message": awsErr.Message(), + "origError": awsErr.OrigErr(), + "logGroupName": l.logGroupName, + "logStreamName": l.logStreamName, + }).Error("Failed to put log events") + } + return nil, err + } + return resp.NextSequenceToken, nil +} + +// ValidateLogOpt looks for awslogs-specific log options awslogs-region, +// awslogs-group, and awslogs-stream +func ValidateLogOpt(cfg map[string]string) error { + for key := range cfg { + switch key { + case logGroupKey: + case logStreamKey: + case regionKey: + default: + return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name) + } + } + if cfg[logGroupKey] == "" { + return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey) + } + if cfg[regionKey] == "" && os.Getenv(regionEnvKey) == "" { + return fmt.Errorf( + "must specify a value for environment variable '%s' or log opt '%s'", + regionEnvKey, + regionKey) + } + return nil +} + +// Len returns the length of a byTimestamp slice. Len is required by the +// sort.Interface interface. +func (slice byTimestamp) Len() int { + return len(slice) +} + +// Less compares two values in a byTimestamp slice by Timestamp. Less is +// required by the sort.Interface interface. +func (slice byTimestamp) Less(i, j int) bool { + iTimestamp, jTimestamp := int64(0), int64(0) + if slice != nil && slice[i].Timestamp != nil { + iTimestamp = *slice[i].Timestamp + } + if slice != nil && slice[j].Timestamp != nil { + jTimestamp = *slice[j].Timestamp + } + return iTimestamp < jTimestamp +} + +// Swap swaps two values in a byTimestamp slice with each other. Swap is +// required by the sort.Interface interface. +func (slice byTimestamp) Swap(i, j int) { + slice[i], slice[j] = slice[j], slice[i] +} diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go new file mode 100644 index 0000000000..680cfde673 --- /dev/null +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -0,0 +1,572 @@ +package awslogs + +import ( + "errors" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + "github.com/docker/docker/daemon/logger" +) + +const ( + groupName = "groupName" + streamName = "streamName" + sequenceToken = "sequenceToken" + nextSequenceToken = "nextSequenceToken" + logline = "this is a log line" +) + +func TestCreateSuccess(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + } + mockClient.createLogStreamResult <- &createLogStreamResult{} + + err := stream.create() + + if err != nil { + t.Errorf("Received unexpected err: %v\n", err) + } + argument := <-mockClient.createLogStreamArgument + if argument.LogGroupName == nil { + t.Fatal("Expected non-nil LogGroupName") + } + if *argument.LogGroupName != groupName { + t.Errorf("Expected LogGroupName to be %s", groupName) + } + if argument.LogStreamName == nil { + t.Fatal("Expected non-nil LogGroupName") + } + if *argument.LogStreamName != streamName { + t.Errorf("Expected LogStreamName to be %s", streamName) + } +} + +func TestCreateError(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + } + mockClient.createLogStreamResult <- &createLogStreamResult{ + errorResult: errors.New("Error!"), + } + + err := stream.create() + + if err == nil { + t.Fatal("Expected non-nil err") + } +} + +func TestCreateAlreadyExists(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + } + mockClient.createLogStreamResult <- &createLogStreamResult{ + errorResult: awserr.New(resourceAlreadyExistsCode, "", nil), + } + + err := stream.create() + + if err != nil { + t.Fatal("Expected nil err") + } +} + +func TestPublishBatchSuccess(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + + events := []*cloudwatchlogs.InputLogEvent{ + { + Message: aws.String(logline), + }, + } + + stream.publishBatch(events) + if stream.sequenceToken == nil { + t.Fatal("Expected non-nil sequenceToken") + } + if *stream.sequenceToken != nextSequenceToken { + t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken) + } + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if argument.SequenceToken == nil { + t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken") + } + if *argument.SequenceToken != sequenceToken { + t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken) + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) + } + if argument.LogEvents[0] != events[0] { + t.Error("Expected event to equal input") + } +} + +func TestPublishBatchError(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + errorResult: errors.New("Error!"), + } + + events := []*cloudwatchlogs.InputLogEvent{ + { + Message: aws.String(logline), + }, + } + + stream.publishBatch(events) + if stream.sequenceToken == nil { + t.Fatal("Expected non-nil sequenceToken") + } + if *stream.sequenceToken != sequenceToken { + t.Errorf("Expected sequenceToken to be %s, but was %s", sequenceToken, *stream.sequenceToken) + } +} + +func TestPublishBatchInvalidSeqSuccess(t *testing.T) { + mockClient := newMockClientBuffered(2) + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + errorResult: awserr.New(invalidSequenceTokenCode, "use token token", nil), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + + events := []*cloudwatchlogs.InputLogEvent{ + { + Message: aws.String(logline), + }, + } + + stream.publishBatch(events) + if stream.sequenceToken == nil { + t.Fatal("Expected non-nil sequenceToken") + } + if *stream.sequenceToken != nextSequenceToken { + t.Errorf("Expected sequenceToken to be %s, but was %s", nextSequenceToken, *stream.sequenceToken) + } + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if argument.SequenceToken == nil { + t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken") + } + if *argument.SequenceToken != sequenceToken { + t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken) + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) + } + if argument.LogEvents[0] != events[0] { + t.Error("Expected event to equal input") + } + + argument = <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if argument.SequenceToken == nil { + t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken") + } + if *argument.SequenceToken != "token" { + t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", "token", *argument.SequenceToken) + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) + } + if argument.LogEvents[0] != events[0] { + t.Error("Expected event to equal input") + } +} + +func TestPublishBatchAlreadyAccepted(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil), + } + + events := []*cloudwatchlogs.InputLogEvent{ + { + Message: aws.String(logline), + }, + } + + stream.publishBatch(events) + if stream.sequenceToken == nil { + t.Fatal("Expected non-nil sequenceToken") + } + if *stream.sequenceToken != "token" { + t.Errorf("Expected sequenceToken to be %s, but was %s", "token", *stream.sequenceToken) + } + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if argument.SequenceToken == nil { + t.Fatal("Expected non-nil PutLogEventsInput.SequenceToken") + } + if *argument.SequenceToken != sequenceToken { + t.Errorf("Expected PutLogEventsInput.SequenceToken to be %s, but was %s", sequenceToken, *argument.SequenceToken) + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) + } + if argument.LogEvents[0] != events[0] { + t.Error("Expected event to equal input") + } +} + +func TestCollectBatchSimple(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Time{}, + }) + + ticks <- time.Time{} + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != logline { + t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message) + } +} + +func TestCollectBatchTicker(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + stream.Log(&logger.Message{ + Line: []byte(logline + " 1"), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte(logline + " 2"), + Timestamp: time.Time{}, + }) + + ticks <- time.Time{} + + // Verify first batch + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 2 { + t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != logline+" 1" { + t.Errorf("Expected message to be %s but was %s", logline+" 1", *argument.LogEvents[0].Message) + } + if *argument.LogEvents[1].Message != logline+" 2" { + t.Errorf("Expected message to be %s but was %s", logline+" 2", *argument.LogEvents[0].Message) + } + + stream.Log(&logger.Message{ + Line: []byte(logline + " 3"), + Timestamp: time.Time{}, + }) + + ticks <- time.Time{} + argument = <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != logline+" 3" { + t.Errorf("Expected message to be %s but was %s", logline+" 3", *argument.LogEvents[0].Message) + } + + stream.Close() + +} + +func TestCollectBatchClose(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + var ticks = make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + stream.Log(&logger.Message{ + Line: []byte(logline), + Timestamp: time.Time{}, + }) + + // no ticks + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != logline { + t.Errorf("Expected message to be %s but was %s", logline, *argument.LogEvents[0].Message) + } +} + +func TestCollectBatchLineSplit(t *testing.T) { + mockClient := newMockClient() + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + var ticks = make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + longline := strings.Repeat("A", maximumBytesPerEvent) + stream.Log(&logger.Message{ + Line: []byte(longline + "B"), + Timestamp: time.Time{}, + }) + + // no ticks + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 2 { + t.Errorf("Expected LogEvents to contain 2 elements, but contains %d", len(argument.LogEvents)) + } + if *argument.LogEvents[0].Message != longline { + t.Errorf("Expected message to be %s but was %s", longline, *argument.LogEvents[0].Message) + } + if *argument.LogEvents[1].Message != "B" { + t.Errorf("Expected message to be %s but was %s", "B", *argument.LogEvents[1].Message) + } +} + +func TestCollectBatchMaxEvents(t *testing.T) { + mockClient := newMockClientBuffered(1) + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + var ticks = make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + line := "A" + for i := 0; i <= maximumLogEventsPerPut; i++ { + stream.Log(&logger.Message{ + Line: []byte(line), + Timestamp: time.Time{}, + }) + } + + // no ticks + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != maximumLogEventsPerPut { + t.Errorf("Expected LogEvents to contain %d elements, but contains %d", maximumLogEventsPerPut, len(argument.LogEvents)) + } + + argument = <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain %d elements, but contains %d", 1, len(argument.LogEvents)) + } +} + +func TestCollectBatchMaxTotalBytes(t *testing.T) { + mockClient := newMockClientBuffered(1) + stream := &logStream{ + client: mockClient, + logGroupName: groupName, + logStreamName: streamName, + sequenceToken: aws.String(sequenceToken), + messages: make(chan *logger.Message), + } + mockClient.putLogEventsResult <- &putLogEventsResult{ + successResult: &cloudwatchlogs.PutLogEventsOutput{ + NextSequenceToken: aws.String(nextSequenceToken), + }, + } + var ticks = make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + + go stream.collectBatch() + + longline := strings.Repeat("A", maximumBytesPerPut) + stream.Log(&logger.Message{ + Line: []byte(longline + "B"), + Timestamp: time.Time{}, + }) + + // no ticks + stream.Close() + + argument := <-mockClient.putLogEventsArgument + if argument == nil { + t.Fatal("Expected non-nil PutLogEventsInput") + } + bytes := 0 + for _, event := range argument.LogEvents { + bytes += len(*event.Message) + } + if bytes > maximumBytesPerPut { + t.Errorf("Expected <= %d bytes but was %d", maximumBytesPerPut, bytes) + } + + argument = <-mockClient.putLogEventsArgument + if len(argument.LogEvents) != 1 { + t.Errorf("Expected LogEvents to contain 1 elements, but contains %d", len(argument.LogEvents)) + } + message := *argument.LogEvents[0].Message + if message[len(message)-1:] != "B" { + t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:]) + } +} diff --git a/daemon/logger/awslogs/cwlogsiface_mock_test.go b/daemon/logger/awslogs/cwlogsiface_mock_test.go new file mode 100644 index 0000000000..64ea1bc829 --- /dev/null +++ b/daemon/logger/awslogs/cwlogsiface_mock_test.go @@ -0,0 +1,56 @@ +package awslogs + +import "github.com/aws/aws-sdk-go/service/cloudwatchlogs" + +type mockcwlogsclient struct { + createLogStreamArgument chan *cloudwatchlogs.CreateLogStreamInput + createLogStreamResult chan *createLogStreamResult + putLogEventsArgument chan *cloudwatchlogs.PutLogEventsInput + putLogEventsResult chan *putLogEventsResult +} + +type createLogStreamResult struct { + successResult *cloudwatchlogs.CreateLogStreamOutput + errorResult error +} + +type putLogEventsResult struct { + successResult *cloudwatchlogs.PutLogEventsOutput + errorResult error +} + +func newMockClient() *mockcwlogsclient { + return &mockcwlogsclient{ + createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, 1), + createLogStreamResult: make(chan *createLogStreamResult, 1), + putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, 1), + putLogEventsResult: make(chan *putLogEventsResult, 1), + } +} + +func newMockClientBuffered(buflen int) *mockcwlogsclient { + return &mockcwlogsclient{ + createLogStreamArgument: make(chan *cloudwatchlogs.CreateLogStreamInput, buflen), + createLogStreamResult: make(chan *createLogStreamResult, buflen), + putLogEventsArgument: make(chan *cloudwatchlogs.PutLogEventsInput, buflen), + putLogEventsResult: make(chan *putLogEventsResult, buflen), + } +} + +func (m *mockcwlogsclient) CreateLogStream(input *cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error) { + m.createLogStreamArgument <- input + output := <-m.createLogStreamResult + return output.successResult, output.errorResult +} + +func (m *mockcwlogsclient) PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error) { + m.putLogEventsArgument <- input + output := <-m.putLogEventsResult + return output.successResult, output.errorResult +} + +func test() { + _ = &logStream{ + client: newMockClient(), + } +} diff --git a/docs/reference/api/docker_remote_api_v1.21.md b/docs/reference/api/docker_remote_api_v1.21.md index d2ca3337fe..ee178cc7a1 100644 --- a/docs/reference/api/docker_remote_api_v1.21.md +++ b/docs/reference/api/docker_remote_api_v1.21.md @@ -298,7 +298,7 @@ Json Parameters: systems, such as SELinux. - **LogConfig** - Log configuration for the container, specified as a JSON object in the form `{ "Type": "", "Config": {"key1": "val1"}}`. - Available types: `json-file`, `syslog`, `journald`, `gelf`, `none`. + Available types: `json-file`, `syslog`, `journald`, `gelf`, `awslogs`, `none`. `json-file` logging driver. - **CgroupParent** - Path to `cgroups` under which the container's `cgroup` is created. If the path is not absolute, the path is considered to be relative to the `cgroups` path of the init process. Cgroups are created if they do not already exist. - **VolumeDriver** - Driver that this container users to mount volumes. diff --git a/docs/reference/logging/awslogs.md b/docs/reference/logging/awslogs.md new file mode 100644 index 0000000000..4100fd1086 --- /dev/null +++ b/docs/reference/logging/awslogs.md @@ -0,0 +1,90 @@ + + +# Amazon CloudWatch Logs logging driver + +The `awslogs` logging driver sends container logs to +[Amazon CloudWatch Logs](https://aws.amazon.com/cloudwatch/details/#log-monitoring). +Log entries can be retrieved through the [AWS Management +Console](https://console.aws.amazon.com/cloudwatch/home#logs:) or the [AWS SDKs +and Command Line Tools](http://docs.aws.amazon.com/cli/latest/reference/logs/index.html). + +## Usage + +You can configure the default logging driver by passing the `--log-driver` +option to the Docker daemon: + + docker --log-driver=awslogs + +You can set the logging driver for a specific container by using the +`--log-driver` option to `docker run`: + + docker run --log-driver=awslogs ... + +## Amazon CloudWatch Logs options + +You can use the `--log-opt NAME=VALUE` flag to specify Amazon CloudWatch Logs logging driver options. + +### awslogs-region + +You must specify a region for the `awslogs` logging driver. You can specify the +region with either the `awslogs-region` log option or `AWS_REGION` environment +variable: + + docker run --log-driver=awslogs --log-opt awslogs-region=us-east-1 ... + +### awslogs-group + +You must specify a +[log group](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/WhatIsCloudWatchLogs.html) +for the `awslogs` logging driver. You can specify the log group with the +`awslogs-group` log option: + + docker run --log-driver=awslogs --log-opt awslogs-region=us-east-1 --log-opt awslogs-group=myLogGroup ... + +### awslogs-stream + +To configure which +[log stream](http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/WhatIsCloudWatchLogs.html) +should be used, you can specify the `awslogs-stream` log option. If not +specified, the container ID is used as the log stream. + +> **Note:** +> Log streams within a given log group should only be used by one container +> at a time. Using the same log stream for multiple containers concurrently +> can cause reduced logging performance. + +## Credentials + +You must provide AWS credentials to the Docker daemon to use the `awslogs` +logging driver. You can provide these credentials with the `AWS_ACCESS_KEY_ID`, +`AWS_SECRET_ACCESS_KEY`, and `AWS_SESSION_TOKEN` environment variables, the +default AWS shared credentials file (`~/.aws/credentials` of the root user), or +(if you are running the Docker daemon on an Amazon EC2 instance) the Amazon EC2 +instance profile. + +Credentials must have a policy applied that allows the `logs:CreateLogStream` +and `logs:PutLogEvents` actions, as shown in the following example. + + { + "Version": "2012-10-17", + "Statement": [ + { + "Action": [ + "logs:CreateLogStream", + "logs:PutLogEvents" + ], + "Effect": "Allow", + "Resource": "*" + } + ] + } + + diff --git a/docs/reference/logging/index.md b/docs/reference/logging/index.md index 7fd9245dc5..7a066e019c 100644 --- a/docs/reference/logging/index.md +++ b/docs/reference/logging/index.md @@ -15,4 +15,5 @@ weight=8 * [Configuring logging drivers](overview) * [Fluentd logging driver](fluentd) -* [Journald logging driver](journald) \ No newline at end of file +* [Journald logging driver](journald) +* [Amazon CloudWatch Logs logging driver](awslogs) diff --git a/docs/reference/logging/overview.md b/docs/reference/logging/overview.md index 423fbfcafe..b2147e12f1 100644 --- a/docs/reference/logging/overview.md +++ b/docs/reference/logging/overview.md @@ -23,6 +23,7 @@ container's logging driver. The following options are supported: | `journald` | Journald logging driver for Docker. Writes log messages to `journald`. | | `gelf` | Graylog Extended Log Format (GELF) logging driver for Docker. Writes log messages to a GELF endpoint likeGraylog or Logstash. | | `fluentd` | Fluentd logging driver for Docker. Writes log messages to `fluentd` (forward input). | +| `awslogs` | Amazon CloudWatch Logs logging driver for Docker. Writes log messages to Amazon CloudWatch Logs. | The `docker logs`command is available only for the `json-file` logging driver. @@ -128,3 +129,15 @@ For example, to specify both additional options: If container cannot connect to the Fluentd daemon on the specified address, the container stops immediately. For detailed information on working with this logging driver, see [the fluentd logging driver](/reference/logging/fluentd/) + +## Specify Amazon CloudWatch Logs options + +The Amazon CloudWatch Logs logging driver supports the following options: + + --log-opt awslogs-region= + --log-opt awslogs-group= + --log-opt awslogs-stream= + + +For detailed information on working with this logging driver, see [the awslogs logging driver](/reference/logging/awslogs/) +reference documentation. diff --git a/docs/reference/run.md b/docs/reference/run.md index bcb4ed32e2..ced540d2a2 100644 --- a/docs/reference/run.md +++ b/docs/reference/run.md @@ -1011,6 +1011,7 @@ container's logging driver. The following options are supported: | `journald` | Journald logging driver for Docker. Writes log messages to `journald`. | | `gelf` | Graylog Extended Log Format (GELF) logging driver for Docker. Writes log messages to a GELF endpoint likeGraylog or Logstash. | | `fluentd` | Fluentd logging driver for Docker. Writes log messages to `fluentd` (forward input). | +| `awslogs` | Amazon CloudWatch Logs logging driver for Docker. Writes log messages to Amazon CloudWatch Logs | The `docker logs`command is available only for the `json-file` logging driver. For detailed information on working with logging drivers, see diff --git a/man/docker-create.1.md b/man/docker-create.1.md index 13ea642441..9385c882be 100644 --- a/man/docker-create.1.md +++ b/man/docker-create.1.md @@ -168,7 +168,7 @@ millions of trillions. Add link to another container in the form of :alias or just in which case the alias will match the name. -**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*none*" +**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*awslogs*|*none*" Logging driver for container. Default is defined by daemon `--log-driver` flag. **Warning**: `docker logs` command works only for `json-file` logging driver. diff --git a/man/docker-run.1.md b/man/docker-run.1.md index 7fb9d0f162..0bb339d34e 100644 --- a/man/docker-run.1.md +++ b/man/docker-run.1.md @@ -268,7 +268,7 @@ which interface and port to use. **--lxc-conf**=[] (lxc exec-driver only) Add custom lxc options --lxc-conf="lxc.cgroup.cpuset.cpus = 0,1" -**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*none*" +**--log-driver**="|*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*awslogs*|*none*" Logging driver for container. Default is defined by daemon `--log-driver` flag. **Warning**: `docker logs` command works only for `json-file` logging driver. diff --git a/man/docker.1.md b/man/docker.1.md index d6bba74eec..42410c4d91 100644 --- a/man/docker.1.md +++ b/man/docker.1.md @@ -119,7 +119,7 @@ unix://[/path/to/socket] to use. **--label**="[]" Set key=value labels to the daemon (displayed in `docker info`) -**--log-driver**="*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*none*" +**--log-driver**="*json-file*|*syslog*|*journald*|*gelf*|*fluentd*|*awslogs*|*none*" Default driver for container logs. Default is `json-file`. **Warning**: `docker logs` command works only for `json-file` logging driver.