Remove statsd from Boulder (#2752)

This removes the config and code to output to statsd.

- Change `cmd.StatsAndLogging` to output a `Scope`, not a `Statter`.
- Remove the prefixing of component name (e.g. "VA") in front of stats; this was stripped by `autoProm` but now no longer needs to be.
- Delete vendored statsd client.
- Delete `MockStatter` (generated by gomock) and `mocks.Statter` (hand generated) in favor of mocking `metrics.Scope`, which is the interface we now use everywhere.
- Remove a few unused methods on `metrics.Scope`, and update its generated mock.
- Refactor `autoProm` and add `autoRegisterer`, which can be included in a `metrics.Scope`, avoiding global state. `autoProm` now registers everything with the `prometheus.Registerer` it is given.
- Change va_test.go's `setup()` to not return a stats object; instead the individual tests that care about stats override `va.stats` directly.

Fixes #2639, #2733.
This commit is contained in:
Jacob Hoffman-Andrews 2017-05-15 07:19:54 -07:00 committed by Daniel McCarney
parent 65f1242ed4
commit b17b5c72a6
50 changed files with 185 additions and 1304 deletions

5
Godeps/Godeps.json generated
View File

@ -10,11 +10,6 @@
"ImportPath": "github.com/beorn7/perks/quantile",
"Rev": "3ac7bf7a47d159a033b107610db8a1b6575507a4"
},
{
"ImportPath": "github.com/cactus/go-statsd-client/statsd",
"Comment": "v2.0.2",
"Rev": "26a31053503dcd96f51363a90eb03e0b32406e26"
},
{
"ImportPath": "github.com/cloudflare/cfssl/auth",
"Comment": "1.2.0-117-g6689808",

View File

@ -59,14 +59,11 @@ type config struct {
Features map[string]bool
}
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
}
func setupContext(c config) (core.RegistrationAuthority, blog.Logger, *gorp.DbMap, core.StorageAuthority, metrics.Scope) {
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "AdminRevoker")
scope, logger := cmd.StatsAndLogging(c.Syslog)
var tls *tls.Config
var err error

View File

@ -22,7 +22,6 @@ import (
"github.com/letsencrypt/boulder/features"
"github.com/letsencrypt/boulder/goodkey"
bgrpc "github.com/letsencrypt/boulder/grpc"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/policy"
sapb "github.com/letsencrypt/boulder/sa/proto"
)
@ -34,8 +33,6 @@ type config struct {
PA cmd.PAConfig
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
Common struct {
@ -140,8 +137,7 @@ func main() {
err = features.Set(c.CA.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "CA")
scope, logger := cmd.StatsAndLogging(c.Syslog)
defer logger.AuditPanic()
logger.Info(cmd.VersionString(clientName))

View File

@ -12,7 +12,6 @@ import (
"github.com/letsencrypt/boulder/core"
"github.com/letsencrypt/boulder/features"
bgrpc "github.com/letsencrypt/boulder/grpc"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/publisher"
pubPB "github.com/letsencrypt/boulder/publisher/proto"
sapb "github.com/letsencrypt/boulder/sa/proto"
@ -28,8 +27,6 @@ type config struct {
Features map[string]bool
}
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
Common struct {
@ -54,8 +51,7 @@ func main() {
err = features.Set(c.Publisher.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "Publisher")
scope, logger := cmd.StatsAndLogging(c.Syslog)
defer logger.AuditPanic()
logger.Info(cmd.VersionString(clientName))

View File

@ -18,7 +18,6 @@ import (
"github.com/letsencrypt/boulder/features"
"github.com/letsencrypt/boulder/goodkey"
bgrpc "github.com/letsencrypt/boulder/grpc"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/policy"
pubPB "github.com/letsencrypt/boulder/publisher/proto"
"github.com/letsencrypt/boulder/ra"
@ -76,8 +75,6 @@ type config struct {
PA cmd.PAConfig
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
Common struct {
@ -102,8 +99,7 @@ func main() {
err = features.Set(c.RA.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "RA")
scope, logger := cmd.StatsAndLogging(c.Syslog)
defer logger.AuditPanic()
logger.Info(cmd.VersionString(clientName))

View File

@ -11,7 +11,6 @@ import (
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/features"
bgrpc "github.com/letsencrypt/boulder/grpc"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/sa"
sapb "github.com/letsencrypt/boulder/sa/proto"
)
@ -26,8 +25,6 @@ type config struct {
Features map[string]bool
}
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
}
@ -46,8 +43,7 @@ func main() {
err = features.Set(c.SA.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "SA")
scope, logger := cmd.StatsAndLogging(c.Syslog)
defer logger.AuditPanic()
logger.Info(cmd.VersionString(clientName))

View File

@ -12,7 +12,6 @@ import (
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/features"
bgrpc "github.com/letsencrypt/boulder/grpc"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/va"
)
@ -43,8 +42,6 @@ type config struct {
Features map[string]bool
}
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
Common struct {
@ -69,8 +66,7 @@ func main() {
err = features.Set(c.VA.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "VA")
scope, logger := cmd.StatsAndLogging(c.Syslog)
defer logger.AuditPanic()
logger.Info(cmd.VersionString(clientName))

View File

@ -57,8 +57,6 @@ type config struct {
Features map[string]bool
}
Statsd cmd.StatsdConfig
SubscriberAgreementURL string
Syslog cmd.SyslogConfig
@ -103,8 +101,7 @@ func main() {
err = features.Set(c.WFE.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "WFE")
scope, logger := cmd.StatsAndLogging(c.Syslog)
defer logger.AuditPanic()
logger.Info(cmd.VersionString(clientName))

View File

@ -15,6 +15,7 @@ import (
"time"
"github.com/jmhodges/clock"
"github.com/prometheus/client_golang/prometheus"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
@ -96,7 +97,7 @@ type certChecker struct {
rMu *sync.Mutex
issuedReport report
checkPeriod time.Duration
stats metrics.Statter
stats metrics.Scope
}
func newChecker(saDbMap certDB, clk clock.Clock, pa core.PolicyAuthority, period time.Duration) certChecker {
@ -297,8 +298,6 @@ func main() {
err = features.Set(config.CertChecker.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, err := metrics.NewStatter(config.Statsd.Server, config.Statsd.Prefix)
cmd.FailOnError(err, "Failed to create StatsD client")
syslogger, err := syslog.Dial("", "", syslog.LOG_INFO|syslog.LOG_LOCAL0, "")
cmd.FailOnError(err, "Failed to dial syslog")
logger, err := blog.New(syslogger, 0, 0)
@ -323,7 +322,8 @@ func main() {
cmd.FailOnError(err, "Couldn't load DB URL")
saDbMap, err := sa.NewDbMap(saDbURL, config.CertChecker.DBConfig.MaxDBConns)
cmd.FailOnError(err, "Could not connect to database")
go sa.ReportDbConnCount(saDbMap, metrics.NewStatsdScope(stats, "CertChecker"))
scope := metrics.NewPromScope(prometheus.DefaultRegisterer)
go sa.ReportDbConnCount(saDbMap, scope)
pa, err := policy.New(config.PA.Challenges)
cmd.FailOnError(err, "Failed to create PA")
@ -353,7 +353,7 @@ func main() {
go func() {
s := checker.clock.Now()
checker.processCerts(wg, config.CertChecker.BadResultsOnly)
stats.TimingDuration("certChecker.processingLatency", time.Since(s), 1.0)
scope.TimingDuration("certChecker.processingLatency", time.Since(s))
}()
}
wg.Wait()

View File

@ -133,7 +133,7 @@ func main() {
os.Exit(1)
}
_, log := cmd.StatsAndLogging(cmd.StatsdConfig{}, cmd.SyslogConfig{StdoutLevel: 7})
_, log := cmd.StatsAndLogging(cmd.SyslogConfig{StdoutLevel: 7})
configData, err := ioutil.ReadFile(*configFile)
cmd.FailOnError(err, fmt.Sprintf("Reading %q", *configFile))

View File

@ -384,8 +384,6 @@ type config struct {
Features map[string]bool
}
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
}
@ -409,8 +407,7 @@ func main() {
err = features.Set(c.Mailer.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "Expiration")
scope, logger := cmd.StatsAndLogging(c.Syslog)
defer logger.AuditPanic()
logger.Info(cmd.VersionString(clientName))

View File

@ -26,6 +26,7 @@ import (
berrors "github.com/letsencrypt/boulder/errors"
blog "github.com/letsencrypt/boulder/log"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/metrics/mock_metrics"
"github.com/letsencrypt/boulder/mocks"
"github.com/letsencrypt/boulder/sa"
"github.com/letsencrypt/boulder/sa/satest"
@ -432,9 +433,8 @@ func TestFindCertsAtCapacity(t *testing.T) {
// Override the mailer `stats` with a mock
ctrl := gomock.NewController(t)
defer ctrl.Finish()
statter := metrics.NewMockStatter(ctrl)
stats := metrics.NewStatsdScope(statter, "Expiration")
testCtx.m.stats = stats
statter := mock_metrics.NewMockScope(ctrl)
testCtx.m.stats = statter
// Set the limit to 1 so we are "at capacity" with one result
testCtx.m.limit = 1
@ -443,14 +443,13 @@ func TestFindCertsAtCapacity(t *testing.T) {
// Note: this is not the 24h0m0s nag as you would expect sending time.Hour
// * 24 to setup() for the nag duration. This is because all of the nags are
// offset by defaultNagCheckInterval, which is 24hrs.
statter.EXPECT().Inc("Expiration.Errors.Nag-48h0m0s.AtCapacity",
int64(1), float32(1.0))
statter.EXPECT().Inc("Errors.Nag-48h0m0s.AtCapacity", int64(1))
// findExpiringCertificates() ends up invoking sendNags which calls
// TimingDuration so we need to EXPECT that with the mock
statter.EXPECT().TimingDuration("Expiration.SendLatency", time.Duration(0), float32(1.0))
statter.EXPECT().TimingDuration("SendLatency", time.Duration(0))
// Similarly, findExpiringCertificates() sends its latency as well
statter.EXPECT().TimingDuration("Expiration.ProcessingCertificatesLatency", time.Duration(0), float32(1.0))
statter.EXPECT().TimingDuration("ProcessingCertificatesLatency", time.Duration(0))
err := testCtx.m.findExpiringCertificates()
test.AssertNotError(t, err, "Failed to find expiring certs")

View File

@ -27,7 +27,6 @@ type eapConfig struct {
ExpiredAuthzPurger struct {
cmd.DBConfig
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
GracePeriod cmd.ConfigDuration
@ -151,8 +150,7 @@ func main() {
cmd.FailOnError(err, "Failed to set feature flags")
// Set up logging
stats, auditlogger := cmd.StatsAndLogging(config.ExpiredAuthzPurger.Statsd, config.ExpiredAuthzPurger.Syslog)
scope := metrics.NewStatsdScope(stats, "AuthzPurger")
scope, auditlogger := cmd.StatsAndLogging(config.ExpiredAuthzPurger.Syslog)
auditlogger.Info(cmd.VersionString(clientName))
defer auditlogger.AuditPanic()

View File

@ -16,7 +16,6 @@ import (
"github.com/letsencrypt/boulder/features"
blog "github.com/letsencrypt/boulder/log"
bmail "github.com/letsencrypt/boulder/mail"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/sa"
)
@ -290,7 +289,6 @@ func main() {
cmd.SMTPConfig
Features map[string]bool
}
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
}
configFile := flag.String("config", "", "File containing a JSON config.")
@ -315,8 +313,7 @@ func main() {
err = features.Set(cfg.NotifyMailer.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, log := cmd.StatsAndLogging(cfg.Statsd, cfg.Syslog)
scope := metrics.NewStatsdScope(stats, "NotificationMailer")
scope, log := cmd.StatsAndLogging(cfg.Syslog)
defer log.AuditPanic()
dbURL, err := cfg.NotifyMailer.DBConfig.URL()

View File

@ -145,8 +145,6 @@ type config struct {
Features map[string]bool
}
Statsd cmd.StatsdConfig
Syslog cmd.SyslogConfig
Common struct {
@ -173,8 +171,7 @@ as generated by Boulder's single-ocsp command.
err = features.Set(c.OCSPResponder.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "OCSPResponder")
scope, logger := cmd.StatsAndLogging(c.Syslog)
defer logger.AuditPanic()
logger.Info(cmd.VersionString("ocsp-responder"))

View File

@ -769,8 +769,7 @@ func main() {
err = features.Set(conf.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, auditlogger := cmd.StatsAndLogging(c.Statsd, c.Syslog)
scope := metrics.NewStatsdScope(stats, "OCSPUpdater")
scope, auditlogger := cmd.StatsAndLogging(c.Syslog)
defer auditlogger.AuditPanic()
auditlogger.Info(cmd.VersionString(clientName))

View File

@ -39,7 +39,6 @@ command descriptions:
`
type config struct {
Statsd cmd.StatsdConfig
TLS cmd.TLSConfig
SAService *cmd.GRPCClientConfig
Syslog cmd.SyslogConfig
@ -126,8 +125,7 @@ func setup(configFile string) (metrics.Scope, blog.Logger, core.StorageAuthority
cmd.FailOnError(err, "Failed to parse config file")
err = features.Set(conf.Features)
cmd.FailOnError(err, "Failed to set feature flags")
stats, logger := cmd.StatsAndLogging(conf.Statsd, conf.Syslog)
scope := metrics.NewStatsdScope(stats, "OrphanFinder")
scope, logger := cmd.StatsAndLogging(conf.Syslog)
var tls *tls.Config
if conf.TLS.CertFile != nil {

View File

@ -39,6 +39,7 @@ import (
cfsslLog "github.com/cloudflare/cfssl/log"
"github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/letsencrypt/boulder/core"
@ -108,15 +109,14 @@ func (log grpcLogger) Println(args ...interface{}) {
log.AuditErr(fmt.Sprintln(args...))
}
// StatsAndLogging constructs a Statter and an AuditLogger based on its config
// StatsAndLogging constructs a metrics.Scope and an AuditLogger based on its config
// parameters, and return them both. Crashes if any setup fails.
// Also sets the constructed AuditLogger as the default logger, and configures
// the cfssl, mysql, and grpc packages to use our logger.
// This must be called before any gRPC code is called, because gRPC's SetLogger
// doesn't use any locking.
func StatsAndLogging(statConf StatsdConfig, logConf SyslogConfig) (metrics.Statter, blog.Logger) {
stats, err := metrics.NewStatter(statConf.Server, statConf.Prefix)
FailOnError(err, "Couldn't connect to statsd")
func StatsAndLogging(logConf SyslogConfig) (metrics.Scope, blog.Logger) {
scope := metrics.NewPromScope(prometheus.DefaultRegisterer)
tag := path.Base(os.Args[0])
syslogger, err := syslog.Dial(
@ -137,7 +137,7 @@ func StatsAndLogging(statConf StatsdConfig, logConf SyslogConfig) (metrics.Statt
_ = mysql.SetLogger(mysqlLogger{logger})
grpclog.SetLogger(grpcLogger{logger})
return stats, logger
return scope, logger
}
// FailOnError exits and prints an error message if we encountered a problem

View File

@ -1,21 +0,0 @@
Boulder currently exports stats with both Prometheus and statsd. It is in the
middle of a transition towards Prometheus, with statsd disappearing soon.
Currently, two stat groups in particular are duplicated:
Stats starting with "Gostats_" are duplicated with the default process-level
stats exported by the Prometheus library.
Stats starting with "gRPCClient_" are duplicated by the stats generated by
the go-grpc-prometheus package.
When writing dashboards and alerts in the Prometheus world, we should be careful
to avoid these two categories, as they will disappear eventually. As a general
rule, if a stat is available with an all-lowercase name, choose that one, as it
is probably the Prometheus-native version.
In the long run we will want to create most stats using the native Prometheus
stat interface, since it allows us to use add labels to metrics, which is very
useful. For instance, currently our DNS stats distinguish types of queries by
appending the type to the stat name. This would be more natural as a label in
Prometheus.

View File

@ -112,7 +112,7 @@ type bothWriter struct {
}
// Log the provided message at the appropriate level, writing to
// both stdout and the Logger, as well as informing statsd.
// both stdout and the Logger
func (w *bothWriter) logAtLevel(level syslog.Priority, msg string) {
var prefix string
var err error

View File

@ -13,17 +13,11 @@ import (
// a Prometheus metric name
var invalidPromChars = regexp.MustCompile(`[^[:alnum:]\_:]+`)
// promAdjust adjusts a name for use by Prometheus: It strips off a single label
// of prefix (which is always the name of the service, and therefore duplicated
// by Prometheus' instance labels), and replaces "-" and "." with "_". Invalid
// metric name characters that remain (e.g. `>`) are removed.
// promAdjust adjusts a name for use by Prometheus: It and replaces "-" and "."
// with "_". Invalid metric name characters that remain (e.g. `>`) are removed.
func promAdjust(name string) string {
name = strings.Replace(name, "-", "_", -1)
labels := strings.Split(name, ".")
if len(labels) < 2 {
return invalidPromChars.ReplaceAllString(labels[0], "")
}
name = strings.Join(labels[1:], "_")
name = strings.Replace(name, ".", "_", -1)
return invalidPromChars.ReplaceAllString(name, "")
}
@ -33,6 +27,7 @@ func promAdjust(name string) string {
// same metric). It is safe for concurrent access.
type autoProm struct {
sync.RWMutex
prometheus.Registerer
metrics map[string]prometheus.Collector
}
@ -54,23 +49,34 @@ func (ap *autoProm) get(name string, make maker) prometheus.Collector {
return ap.metrics[name]
}
result = make(name)
prometheus.MustRegister(result)
ap.Registerer.MustRegister(result)
ap.metrics[name] = result
return result
}
func newAutoProm() *autoProm {
func newAutoProm(registerer prometheus.Registerer) *autoProm {
return &autoProm{
metrics: make(map[string]prometheus.Collector),
metrics: make(map[string]prometheus.Collector),
Registerer: prometheus.NewRegistry(),
}
}
var gauges = newAutoProm()
var counters = newAutoProm()
var summaries = newAutoProm()
// autoRegisterer wraps three autoProm instances, one for each type of metric
// managed by this module, and provides methods to get/make those metrics.
type autoRegisterer struct {
gauges, counters, summaries *autoProm
}
func autoGauge(name string) prometheus.Gauge {
return gauges.get(name, func(cleaned string) prometheus.Collector {
func newAutoRegisterer(registerer prometheus.Registerer) *autoRegisterer {
return &autoRegisterer{
gauges: newAutoProm(registerer),
counters: newAutoProm(registerer),
summaries: newAutoProm(registerer),
}
}
func (ar *autoRegisterer) autoGauge(name string) prometheus.Gauge {
return ar.gauges.get(name, func(cleaned string) prometheus.Collector {
return prometheus.NewGauge(prometheus.GaugeOpts{
Name: cleaned,
Help: "auto",
@ -78,8 +84,8 @@ func autoGauge(name string) prometheus.Gauge {
}).(prometheus.Gauge)
}
func autoCounter(name string) prometheus.Counter {
return counters.get(name, func(cleaned string) prometheus.Collector {
func (ar *autoRegisterer) autoCounter(name string) prometheus.Counter {
return ar.counters.get(name, func(cleaned string) prometheus.Collector {
return prometheus.NewCounter(prometheus.CounterOpts{
Name: cleaned,
Help: "auto",
@ -87,8 +93,8 @@ func autoCounter(name string) prometheus.Counter {
}).(prometheus.Counter)
}
func autoSummary(name string) prometheus.Summary {
return summaries.get(name, func(cleaned string) prometheus.Collector {
func (ar *autoRegisterer) autoSummary(name string) prometheus.Summary {
return ar.summaries.get(name, func(cleaned string) prometheus.Collector {
return prometheus.NewSummary(prometheus.SummaryOpts{
Name: cleaned,
Help: "auto",

View File

@ -10,14 +10,11 @@ func TestPromAdjust(t *testing.T) {
testCases := []struct {
input, output string
}{
{"RA.Foo.Bar", "Foo_Bar"},
{"Foo.Bar", "Foo_Bar"},
{"", ""},
{"RA-FOO-BAR", "RA_FOO_BAR"},
{"RA.FOO-BAR", "FOO_BAR"},
{"RA.FOO-BAR", "FOO_BAR"},
{"RA", "RA"},
{"RA>CA", "RACA"},
{"RA?CA!- 99 @#$%&()", "RACA_99"},
{"FOO-BAR", "FOO_BAR"},
{">CA", "CA"},
{"?CA!- 99 @#$%&()", "CA_99"},
}
for _, tc := range testCases {
t.Run(tc.input, func(t *testing.T) {
@ -36,10 +33,11 @@ func TestAutoProm(t *testing.T) {
madeGauge = prometheus.NewGauge(prometheus.GaugeOpts{Name: "hi", Help: "hi"})
return madeGauge
}
ap := newAutoProm()
registry := prometheus.NewRegistry()
ap := newAutoProm(registry)
result := ap.get("foo.bar", recorder)
if calledWithName != "bar" {
t.Errorf("expected maker function to be called with bar, got %q", calledWithName)
if calledWithName != "foo_bar" {
t.Errorf("expected maker function to be called with foo_bar, got %q", calledWithName)
}
if result != madeGauge {
t.Errorf("got back a different gauge than we made")
@ -50,3 +48,33 @@ func TestAutoProm(t *testing.T) {
t.Errorf("expected to get same result twice, got a new result")
}
}
func TestAutoRegisterer(t *testing.T) {
registry := prometheus.NewRegistry()
ap := newAutoRegisterer(registry)
gauge := ap.autoGauge("ima_stat")
if gauge == nil {
t.Fatal("gauge was nil")
}
expected := "Desc{fqName: \"ima_stat\", help: \"auto\", constLabels: {}, variableLabels: []}"
gaugeDesc := gauge.Desc().String()
if gaugeDesc != expected {
t.Errorf("gauge description: got %q, expected %q", gaugeDesc, expected)
}
counter := ap.autoCounter("ima_stat")
if counter == nil {
t.Fatal("counter was nil")
}
counterDesc := counter.Desc().String()
if counterDesc != expected {
t.Errorf("counter description: got %q, expected %q", counterDesc, expected)
}
summary := ap.autoSummary("ima_stat")
if summary == nil {
t.Fatal("summary was nil")
}
summaryDesc := summary.Desc().String()
if summaryDesc != expected {
t.Errorf("summary description: got %q, expected %q", summaryDesc, expected)
}
}

View File

@ -1,86 +0,0 @@
package metrics
import (
"fmt"
"os"
"time"
"github.com/cactus/go-statsd-client/statsd"
)
// Statter implements the statsd.Statter interface but
// appends the name of the host the process is running on
// to the end of every stat name
type Statter struct {
suffix string
s statsd.Statter
}
// NewStatter returns a new statsd.Client wrapper
func NewStatter(addr, prefix string) (Statter, error) {
host, err := os.Hostname()
if err != nil {
return Statter{}, err
}
suffix := fmt.Sprintf(".%s", host)
s, err := statsd.NewClient(addr, prefix)
if err != nil {
return Statter{}, err
}
return Statter{suffix, s}, nil
}
// Inc wraps statsd.Client.Inc
func (s Statter) Inc(n string, v int64, r float32) error {
return s.s.Inc(n+s.suffix, v, r)
}
// Dec wraps statsd.Client.Dec
func (s Statter) Dec(n string, v int64, r float32) error {
return s.s.Dec(n+s.suffix, v, r)
}
// Gauge wraps statsd.Client.Gauge
func (s Statter) Gauge(n string, v int64, r float32) error {
return s.s.Gauge(n+s.suffix, v, r)
}
// GaugeDelta wraps statsd.Client.GaugeDelta
func (s Statter) GaugeDelta(n string, v int64, r float32) error {
return s.s.GaugeDelta(n+s.suffix, v, r)
}
// Timing wraps statsd.Client.Timing
func (s Statter) Timing(n string, v int64, r float32) error {
return s.s.Timing(n+s.suffix, v, r)
}
// TimingDuration wraps statsd.Client.TimingDuration
func (s Statter) TimingDuration(n string, v time.Duration, r float32) error {
return s.s.TimingDuration(n+s.suffix, v, r)
}
// Set wraps statsd.Client.Set
func (s Statter) Set(n string, v string, r float32) error {
return s.s.Set(n+s.suffix, v, r)
}
// SetInt wraps statsd.Client.SetInt
func (s Statter) SetInt(n string, v int64, r float32) error {
return s.s.SetInt(n+s.suffix, v, r)
}
// Raw wraps statsd.Client.Raw
func (s Statter) Raw(n string, v string, r float32) error {
return s.s.Raw(n+s.suffix, v, r)
}
// SetPrefix wraps statsd.Client.SetPrefix
func (s Statter) SetPrefix(p string) {
s.s.SetPrefix(p)
}
// Close wraps statsd.Client.Close
func (s Statter) Close() error {
return s.s.Close()
}

View File

@ -1 +0,0 @@
package metrics

View File

@ -30,16 +30,6 @@ func (_m *MockScope) EXPECT() *_MockScopeRecorder {
return _m.recorder
}
func (_m *MockScope) Dec(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "Dec", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Dec(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Dec", arg0, arg1)
}
func (_m *MockScope) Gauge(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "Gauge", _param0, _param1)
ret0, _ := ret[0].(error)
@ -84,36 +74,6 @@ func (_mr *_MockScopeRecorder) NewScope(arg0 ...interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "NewScope", arg0...)
}
func (_m *MockScope) Raw(_param0 string, _param1 string) error {
ret := _m.ctrl.Call(_m, "Raw", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Raw(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Raw", arg0, arg1)
}
func (_m *MockScope) Scope() string {
ret := _m.ctrl.Call(_m, "Scope")
ret0, _ := ret[0].(string)
return ret0
}
func (_mr *_MockScopeRecorder) Scope() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Scope")
}
func (_m *MockScope) Set(_param0 string, _param1 string) error {
ret := _m.ctrl.Call(_m, "Set", _param0, _param1)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockScopeRecorder) Set(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Set", arg0, arg1)
}
func (_m *MockScope) SetInt(_param0 string, _param1 int64) error {
ret := _m.ctrl.Call(_m, "SetInt", _param0, _param1)
ret0, _ := ret[0].(error)

View File

@ -1,138 +0,0 @@
// Automatically generated by MockGen. DO NOT EDIT!
// Source: github.com/cactus/go-statsd-client/statsd (interfaces: Statter)
package metrics
import (
gomock "github.com/golang/mock/gomock"
time "time"
)
// Mock of Statter interface
type MockStatter struct {
ctrl *gomock.Controller
recorder *_MockStatterRecorder
}
// Recorder for MockStatter (not exported)
type _MockStatterRecorder struct {
mock *MockStatter
}
func NewMockStatter(ctrl *gomock.Controller) *MockStatter {
mock := &MockStatter{ctrl: ctrl}
mock.recorder = &_MockStatterRecorder{mock}
return mock
}
func (_m *MockStatter) EXPECT() *_MockStatterRecorder {
return _m.recorder
}
func (_m *MockStatter) Close() error {
ret := _m.ctrl.Call(_m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) Close() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Close")
}
func (_m *MockStatter) Dec(_param0 string, _param1 int64, _param2 float32) error {
ret := _m.ctrl.Call(_m, "Dec", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) Dec(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Dec", arg0, arg1, arg2)
}
func (_m *MockStatter) Gauge(_param0 string, _param1 int64, _param2 float32) error {
ret := _m.ctrl.Call(_m, "Gauge", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) Gauge(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Gauge", arg0, arg1, arg2)
}
func (_m *MockStatter) GaugeDelta(_param0 string, _param1 int64, _param2 float32) error {
ret := _m.ctrl.Call(_m, "GaugeDelta", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) GaugeDelta(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "GaugeDelta", arg0, arg1, arg2)
}
func (_m *MockStatter) Inc(_param0 string, _param1 int64, _param2 float32) error {
ret := _m.ctrl.Call(_m, "Inc", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) Inc(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Inc", arg0, arg1, arg2)
}
func (_m *MockStatter) Raw(_param0 string, _param1 string, _param2 float32) error {
ret := _m.ctrl.Call(_m, "Raw", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) Raw(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Raw", arg0, arg1, arg2)
}
func (_m *MockStatter) Set(_param0 string, _param1 string, _param2 float32) error {
ret := _m.ctrl.Call(_m, "Set", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) Set(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Set", arg0, arg1, arg2)
}
func (_m *MockStatter) SetInt(_param0 string, _param1 int64, _param2 float32) error {
ret := _m.ctrl.Call(_m, "SetInt", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) SetInt(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "SetInt", arg0, arg1, arg2)
}
func (_m *MockStatter) SetPrefix(_param0 string) {
_m.ctrl.Call(_m, "SetPrefix", _param0)
}
func (_mr *_MockStatterRecorder) SetPrefix(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "SetPrefix", arg0)
}
func (_m *MockStatter) Timing(_param0 string, _param1 int64, _param2 float32) error {
ret := _m.ctrl.Call(_m, "Timing", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) Timing(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Timing", arg0, arg1, arg2)
}
func (_m *MockStatter) TimingDuration(_param0 string, _param1 time.Duration, _param2 float32) error {
ret := _m.ctrl.Call(_m, "TimingDuration", _param0, _param1, _param2)
ret0, _ := ret[0].(error)
return ret0
}
func (_mr *_MockStatterRecorder) TimingDuration(arg0, arg1, arg2 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "TimingDuration", arg0, arg1, arg2)
}

View File

@ -1,117 +1,87 @@
//go:generate mockgen -package metrics -destination ./mock_statsd.go github.com/cactus/go-statsd-client/statsd Statter
package metrics
import (
"strings"
"time"
"github.com/cactus/go-statsd-client/statsd"
"github.com/prometheus/client_golang/prometheus"
)
// Scope is a stats collector that will prefix the name the stats it
// collects.
type Scope interface {
NewScope(scopes ...string) Scope
Scope() string
Inc(stat string, value int64) error
Dec(stat string, value int64) error
Gauge(stat string, value int64) error
GaugeDelta(stat string, value int64) error
Timing(stat string, delta int64) error
TimingDuration(stat string, delta time.Duration) error
Set(stat string, value string) error
SetInt(stat string, value int64) error
Raw(stat string, value string) error
}
// StatsdScope is a Scope that sends data to statsd with a prefix added to the
// stat names.
type StatsdScope struct {
prefix string
statter statsd.Statter
// promScope is a Scope that sends data to Prometheus
type promScope struct {
*autoRegisterer
prefix string
registerer prometheus.Registerer
}
var _ Scope = &StatsdScope{}
var _ Scope = &promScope{}
// NewStatsdScope returns a StatsdScope that prefixes stats it collections with
// the scopes given joined together by periods
func NewStatsdScope(statter statsd.Statter, scopes ...string) *StatsdScope {
return &StatsdScope{
prefix: strings.Join(scopes, ".") + ".",
statter: statter,
// NewPromScope returns a Scope that sends data to Prometheus
func NewPromScope(registerer prometheus.Registerer, scopes ...string) Scope {
return &promScope{
prefix: strings.Join(scopes, ".") + ".",
autoRegisterer: newAutoRegisterer(registerer),
}
}
// NewNoopScope returns a Scope that won't collect anything
func NewNoopScope() Scope {
c, _ := statsd.NewNoopClient()
return NewStatsdScope(c, "noop")
return NewPromScope(prometheus.NewRegistry())
}
// NewScope generates a new Scope prefixed by this Scope's prefix plus the
// prefixes given joined by periods
func (s *StatsdScope) NewScope(scopes ...string) Scope {
func (s *promScope) NewScope(scopes ...string) Scope {
scope := strings.Join(scopes, ".")
return NewStatsdScope(s.statter, s.prefix+scope)
}
// Scope returns the current string prefix (except for the final period) that
// stats will receive
func (s *StatsdScope) Scope() string {
return s.prefix[:len(s.prefix)-1]
return NewPromScope(s.registerer, s.prefix+scope)
}
// Inc increments the given stat and adds the Scope's prefix to the name
func (s *StatsdScope) Inc(stat string, value int64) error {
autoCounter(s.prefix + stat).Add(float64(1))
return s.statter.Inc(s.prefix+stat, value, 1.0)
}
// Dec decrements the given stat and adds the Scope's prefix to the name
func (s *StatsdScope) Dec(stat string, value int64) error {
return s.statter.Dec(s.prefix+stat, value, 1.0)
func (s *promScope) Inc(stat string, value int64) error {
s.autoCounter(s.prefix + stat).Add(float64(value))
return nil
}
// Gauge sends a gauge stat and adds the Scope's prefix to the name
func (s *StatsdScope) Gauge(stat string, value int64) error {
autoGauge(s.prefix + stat).Set(float64(value))
return s.statter.Gauge(s.prefix+stat, value, 1.0)
func (s *promScope) Gauge(stat string, value int64) error {
s.autoGauge(s.prefix + stat).Set(float64(value))
return nil
}
// GaugeDelta sends the change in a gauge stat and adds the Scope's prefix to the name
func (s *StatsdScope) GaugeDelta(stat string, value int64) error {
autoGauge(s.prefix + stat).Add(float64(value))
return s.statter.GaugeDelta(s.prefix+stat, value, 1.0)
func (s *promScope) GaugeDelta(stat string, value int64) error {
s.autoGauge(s.prefix + stat).Add(float64(value))
return nil
}
// Timing sends a latency stat and adds the Scope's prefix to the name
func (s *StatsdScope) Timing(stat string, delta int64) error {
autoSummary(s.prefix + stat + "_seconds").Observe(float64(delta))
return s.statter.Timing(s.prefix+stat, delta, 1.0)
func (s *promScope) Timing(stat string, delta int64) error {
s.autoSummary(s.prefix + stat + "_seconds").Observe(float64(delta))
return nil
}
// TimingDuration sends a latency stat as a time.Duration and adds the Scope's
// prefix to the name
func (s *StatsdScope) TimingDuration(stat string, delta time.Duration) error {
autoSummary(s.prefix + stat + "_seconds").Observe(delta.Seconds())
return s.statter.TimingDuration(s.prefix+stat, delta, 1.0)
}
// Set sets a stat's new value and adds the Scope's prefix to the name
func (s *StatsdScope) Set(stat string, value string) error {
return s.statter.Set(s.prefix+stat, value, 1.0)
func (s *promScope) TimingDuration(stat string, delta time.Duration) error {
s.autoSummary(s.prefix + stat + "_seconds").Observe(delta.Seconds())
return nil
}
// SetInt sets a stat's integer value and adds the Scope's prefix to the name
func (s *StatsdScope) SetInt(stat string, value int64) error {
autoGauge(s.prefix + stat).Set(float64(value))
return s.statter.SetInt(s.prefix+stat, value, 1.0)
}
// Raw sends a stat value without interpretation and adds the Scope's prefix to
// the name
func (s *StatsdScope) Raw(stat string, value string) error {
return s.statter.Raw(s.prefix+stat, value, 1.0)
func (s *promScope) SetInt(stat string, value int64) error {
s.autoGauge(s.prefix + stat).Set(float64(value))
return nil
}

View File

@ -1,57 +0,0 @@
package metrics
import (
"testing"
"time"
"github.com/golang/mock/gomock"
)
func TestScopedStatsStatsd(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
statter := NewMockStatter(ctrl)
stats := NewStatsdScope(statter, "fake")
statter.EXPECT().Inc("fake.counter", int64(2), float32(1.0)).Return(nil)
stats.Inc("counter", 2)
statter.EXPECT().Dec("fake.counter", int64(2), float32(1.0)).Return(nil)
stats.Dec("counter", 2)
statter.EXPECT().Gauge("fake.gauge", int64(2), float32(1.0)).Return(nil)
stats.Gauge("gauge", 2)
statter.EXPECT().GaugeDelta("fake.delta", int64(2), float32(1.0)).Return(nil)
stats.GaugeDelta("delta", 2)
statter.EXPECT().Timing("fake.latency", int64(2), float32(1.0)).Return(nil)
stats.Timing("latency", 2)
statter.EXPECT().TimingDuration("fake.latency", 2*time.Second, float32(1.0)).Return(nil)
stats.TimingDuration("latency", 2*time.Second)
statter.EXPECT().Set("fake.something", "value", float32(1.0)).Return(nil)
stats.Set("something", "value")
statter.EXPECT().SetInt("fake.someint", int64(10), float32(1.0)).Return(nil)
stats.SetInt("someint", 10)
statter.EXPECT().Raw("fake.raw", "raw value", float32(1.0)).Return(nil)
stats.Raw("raw", "raw value")
s := stats.NewScope("foobar")
statter.EXPECT().Inc("fake.foobar.counter", int64(3), float32(1.0)).Return(nil)
s.Inc("counter", 3)
ss := stats.NewScope("another", "level")
statter.EXPECT().Inc("fake.foobar.counter", int64(4), float32(1.0)).Return(nil)
s.Inc("counter", 4)
if stats.Scope() != "fake" {
t.Errorf(`expected "fake", got %#v`, stats.Scope())
}
if s.Scope() != "fake.foobar" {
t.Errorf(`expected "fake.foobar", got %#v`, s.Scope())
}
if ss.Scope() != "fake.another.level" {
t.Errorf(`expected "fake.foobar", got %#v`, s.Scope())
}
twoScope := NewStatsdScope(statter, "fake", "bang")
statter.EXPECT().Inc("fake.bang.counter", int64(7), float32(1.0)).Return(nil)
twoScope.Inc("counter", 7)
}

View File

@ -13,7 +13,6 @@ import (
"strconv"
"time"
"github.com/cactus/go-statsd-client/statsd"
"github.com/jmhodges/clock"
"github.com/miekg/dns"
"golang.org/x/net/context"
@ -389,44 +388,6 @@ func (*Publisher) SubmitToSingleCT(_ context.Context, _, _ string, _ []byte) err
return nil
}
// Statter is a stat counter that is a no-op except for locally handling Inc
// calls (which are most of what we use).
type Statter struct {
statsd.NoopClient
Counters map[string]int64
TimingDurationCalls []TimingDuration
}
// TimingDuration records a statsd call to TimingDuration.
type TimingDuration struct {
Metric string
Duration time.Duration
Rate float32
}
// Inc increments the indicated metric by the indicated value, in the Counters
// map maintained by the statter
func (s *Statter) Inc(metric string, value int64, rate float32) error {
s.Counters[metric] += value
return nil
}
// TimingDuration stores the parameters in the LastTimingDuration field of the
// MockStatter.
func (s *Statter) TimingDuration(metric string, delta time.Duration, rate float32) error {
s.TimingDurationCalls = append(s.TimingDurationCalls, TimingDuration{
Metric: metric,
Duration: delta,
Rate: rate,
})
return nil
}
// NewStatter returns an empty statter with all counters zero
func NewStatter() *Statter {
return &Statter{statsd.NoopClient{}, map[string]int64{}, nil}
}
// Mailer is a mock
type Mailer struct {
Messages []MailerMessage

View File

@ -212,7 +212,7 @@ fi
if [[ "$RUN" =~ "errcheck" ]] ; then
start_context "errcheck"
run_and_expect_silence errcheck \
-ignore io:Write,os:Remove,net/http:Write,github.com/letsencrypt/boulder/metrics:.*,github.com/letsencrypt/boulder/vendor/github.com/cactus/go-statsd-client/statsd:.* \
-ignore io:Write,os:Remove,net/http:Write,github.com/letsencrypt/boulder/metrics:.* \
$(echo ${TESTPATHS} | tr ' ' '\n' | grep -v test)
end_context #errcheck
fi

View File

@ -17,11 +17,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -143,11 +143,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -18,11 +18,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -24,11 +24,6 @@
"frequency": "1h"
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -10,11 +10,6 @@
"debugAddr": ":8005"
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -39,11 +39,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -3,11 +3,6 @@
"stdoutlevel": 7
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"tls": {
"caCertFile": "test/grpc-creds/minica.pem",
"certFile": "test/grpc-creds/orphan-finder.boulder/cert.pem",

View File

@ -21,11 +21,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -55,11 +55,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -28,11 +28,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -33,11 +33,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -38,11 +38,6 @@
}
},
"statsd": {
"server": "localhost:8125",
"prefix": "Boulder"
},
"syslog": {
"stdoutlevel": 6,
"sysloglevel": 4

View File

@ -244,7 +244,7 @@ func TestHTTP(t *testing.T) {
if badPort == 65536 {
badPort = goodPort - 1
}
va, _, log := setup()
va, log := setup()
va.httpPort = badPort
_, prob := va.validateHTTP01(ctx, ident, chall)
@ -333,7 +333,7 @@ func TestHTTPRedirectLookup(t *testing.T) {
defer hs.Close()
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, log := setup()
va, log := setup()
va.httpPort = port
setChallengeToken(&chall, pathMoved)
@ -397,7 +397,7 @@ func TestHTTPRedirectLoop(t *testing.T) {
defer hs.Close()
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, _ := setup()
va, _ := setup()
va.httpPort = port
_, prob := va.validateHTTP01(ctx, ident, chall)
@ -414,7 +414,7 @@ func TestHTTPRedirectUserAgent(t *testing.T) {
defer hs.Close()
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, _ := setup()
va, _ := setup()
va.userAgent = rejectUserAgent
va.httpPort = port
@ -454,7 +454,7 @@ func TestTLSSNI01(t *testing.T) {
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, log := setup()
va, log := setup()
va.tlsPort = port
_, prob := va.validateTLSSNI01(ctx, ident, chall)
@ -531,7 +531,7 @@ func TestTLSSNI02(t *testing.T) {
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, log := setup()
va, log := setup()
va.tlsPort = port
_, prob := va.validateTLSSNI02(ctx, ident, chall)
@ -618,7 +618,7 @@ func TestTLSError(t *testing.T) {
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, _ := setup()
va, _ := setup()
va.tlsPort = port
_, prob := va.validateTLSSNI01(ctx, ident, chall)
@ -702,7 +702,7 @@ func TestSNIErrInvalidChain(t *testing.T) {
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, _ := setup()
va, _ := setup()
va.tlsPort = port
// Validate the SNI challenge with the test server, expecting it to fail
@ -726,7 +726,7 @@ func TestValidateHTTP(t *testing.T) {
hs := httpSrv(t, chall.Token)
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, _ := setup()
va, _ := setup()
va.httpPort = port
defer hs.Close()
@ -763,7 +763,7 @@ func TestValidateTLSSNI01(t *testing.T) {
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, _ := setup()
va, _ := setup()
va.tlsPort = port
_, prob := va.validateChallenge(ctx, ident, chall)
@ -772,7 +772,7 @@ func TestValidateTLSSNI01(t *testing.T) {
}
func TestValidateTLSSNI01NotSane(t *testing.T) {
va, _, _ := setup()
va, _ := setup()
chall := createChallenge(core.ChallengeTypeTLSSNI01)
@ -784,7 +784,7 @@ func TestValidateTLSSNI01NotSane(t *testing.T) {
}
func TestCAATimeout(t *testing.T) {
va, _, _ := setup()
va, _ := setup()
err := va.checkCAA(ctx, core.AcmeIdentifier{Type: core.IdentifierDNS, Value: "caa-timeout.com"})
if err.Type != probs.ConnectionProblem {
t.Errorf("Expected timeout error type %s, got %s", probs.ConnectionProblem, err.Type)
@ -826,7 +826,7 @@ func TestCAAChecking(t *testing.T) {
{"unsatisfiable.com", true, false},
}
va, _, _ := setup()
va, _ := setup()
for _, caaTest := range tests {
present, valid, err := va.checkCAARecords(ctx, core.AcmeIdentifier{Type: "dns", Value: caaTest.Domain})
if err != nil {
@ -862,15 +862,30 @@ func TestCAAChecking(t *testing.T) {
}
func TestPerformValidationInvalid(t *testing.T) {
va, stats, _ := setup()
va, _ := setup()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockScope := mock_metrics.NewMockScope(ctrl)
va.stats = mockScope
mockScope.EXPECT().TimingDuration("Validations.dns-01.invalid", gomock.Any()).Return(nil)
mockScope.EXPECT().Inc(gomock.Any(), gomock.Any()).Return(nil)
chalDNS := createChallenge(core.ChallengeTypeDNS01)
_, prob := va.PerformValidation(context.Background(), "foo.com", chalDNS, core.Authorization{})
test.Assert(t, prob != nil, "validation succeeded")
test.AssertEquals(t, stats.TimingDurationCalls[0].Metric, "VA.Validations.dns-01.invalid")
}
func TestDNSValidationEmpty(t *testing.T) {
va, stats, _ := setup()
va, _ := setup()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockScope := mock_metrics.NewMockScope(ctrl)
va.stats = mockScope
mockScope.EXPECT().TimingDuration("Validations.dns-01.invalid", gomock.Any()).Return(nil)
mockScope.EXPECT().Inc(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
chalDNS := createChallenge(core.ChallengeTypeDNS01)
_, prob := va.PerformValidation(
context.Background(),
@ -878,22 +893,28 @@ func TestDNSValidationEmpty(t *testing.T) {
chalDNS,
core.Authorization{})
test.AssertEquals(t, prob.Error(), "urn:acme:error:unauthorized :: No TXT records found for DNS challenge")
test.AssertEquals(t, stats.TimingDurationCalls[0].Metric, "VA.Validations.dns-01.invalid")
}
func TestPerformValidationValid(t *testing.T) {
va, stats, _ := setup()
va, _ := setup()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockScope := mock_metrics.NewMockScope(ctrl)
va.stats = mockScope
mockScope.EXPECT().TimingDuration("Validations.dns-01.valid", gomock.Any()).Return(nil)
mockScope.EXPECT().Inc(gomock.Any(), gomock.Any()).Return(nil)
// create a challenge with well known token
chalDNS := core.DNSChallenge01()
chalDNS.Token = expectedToken
chalDNS.ProvidedKeyAuthorization = expectedKeyAuthorization
_, prob := va.PerformValidation(context.Background(), "good-dns01.com", chalDNS, core.Authorization{})
test.Assert(t, prob == nil, fmt.Sprintf("validation failed: %#v", prob))
test.AssertEquals(t, stats.TimingDurationCalls[0].Metric, "VA.Validations.dns-01.valid")
}
func TestDNSValidationFailure(t *testing.T) {
va, _, _ := setup()
va, _ := setup()
chalDNS := createChallenge(core.ChallengeTypeDNS01)
@ -911,7 +932,7 @@ func TestDNSValidationInvalid(t *testing.T) {
chalDNS := core.DNSChallenge01()
chalDNS.ProvidedKeyAuthorization = expectedKeyAuthorization
va, _, _ := setup()
va, _ := setup()
_, prob := va.validateChallenge(ctx, notDNS, chalDNS)
@ -919,7 +940,7 @@ func TestDNSValidationInvalid(t *testing.T) {
}
func TestDNSValidationNotSane(t *testing.T) {
va, _, _ := setup()
va, _ := setup()
chal0 := core.DNSChallenge01()
chal0.Token = ""
@ -950,7 +971,7 @@ func TestDNSValidationNotSane(t *testing.T) {
}
func TestDNSValidationServFail(t *testing.T) {
va, _, _ := setup()
va, _ := setup()
chalDNS := createChallenge(core.ChallengeTypeDNS01)
@ -964,7 +985,7 @@ func TestDNSValidationServFail(t *testing.T) {
}
func TestDNSValidationNoServer(t *testing.T) {
va, _, _ := setup()
va, _ := setup()
va.dnsResolver = bdns.NewTestDNSResolverImpl(
time.Second*5,
nil,
@ -980,7 +1001,7 @@ func TestDNSValidationNoServer(t *testing.T) {
}
func TestDNSValidationOK(t *testing.T) {
va, _, _ := setup()
va, _ := setup()
// create a challenge with well known token
chalDNS := core.DNSChallenge01()
@ -998,7 +1019,7 @@ func TestDNSValidationOK(t *testing.T) {
}
func TestDNSValidationNoAuthorityOK(t *testing.T) {
va, _, _ := setup()
va, _ := setup()
// create a challenge with well known token
chalDNS := core.DNSChallenge01()
@ -1024,7 +1045,7 @@ func TestCAAFailure(t *testing.T) {
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, _ := setup()
va, _ := setup()
va.tlsPort = port
ident.Value = "reserved.com"
@ -1040,7 +1061,7 @@ func TestLimitedReader(t *testing.T) {
hs := httpSrv(t, "01234567890123456789012345678901234567890123456789012345678901234567890123456789")
port, err := getPort(hs)
test.AssertNotError(t, err, "failed to get test server port")
va, _, _ := setup()
va, _ := setup()
va.httpPort = port
defer hs.Close()
@ -1052,9 +1073,7 @@ func TestLimitedReader(t *testing.T) {
"Expected failure due to truncation")
}
func setup() (*ValidationAuthorityImpl, *mocks.Statter, *blog.Mock) {
stats := mocks.NewStatter()
scope := metrics.NewStatsdScope(stats, "VA")
func setup() (*ValidationAuthorityImpl, *blog.Mock) {
logger := blog.NewMock()
va := NewValidationAuthorityImpl(
&cmd.PortConfig{},
@ -1063,18 +1082,16 @@ func setup() (*ValidationAuthorityImpl, *mocks.Statter, *blog.Mock) {
&bdns.MockDNSResolver{},
"user agent 1.0",
"letsencrypt.org",
scope,
metrics.NewNoopScope(),
clock.Default(),
logger)
return va, stats, logger
return va, logger
}
func TestCheckCAAFallback(t *testing.T) {
testSrv := httptest.NewServer(http.HandlerFunc(mocks.GPDNSHandler))
defer testSrv.Close()
stats := mocks.NewStatter()
scope := metrics.NewStatsdScope(stats, "VA")
logger := blog.NewMock()
caaDR, err := cdr.New(metrics.NewNoopScope(), time.Second, 1, nil, blog.NewMock())
test.AssertNotError(t, err, "Failed to create CAADistributedResolver")
@ -1087,7 +1104,7 @@ func TestCheckCAAFallback(t *testing.T) {
&bdns.MockDNSResolver{},
"user agent 1.0",
"ca.com",
scope,
metrics.NewNoopScope(),
clock.Default(),
logger)
@ -1206,7 +1223,7 @@ func TestAvailableAddresses(t *testing.T) {
func TestFallbackDialer(t *testing.T) {
// Create a test VA
va, _, _ := setup()
va, _ := setup()
// Create a new challenge to use for the httpSrv
chall := core.HTTPChallenge01()
@ -1265,7 +1282,7 @@ func TestFallbackDialer(t *testing.T) {
func TestFallbackTLS(t *testing.T) {
// Create a test VA
va, _, _ := setup()
va, _ := setup()
// Create a new challenge to use for the httpSrv
chall := createChallenge(core.ChallengeTypeTLSSNI01)

View File

@ -1,19 +0,0 @@
Copyright (c) 2012-2015 Eli Janssen
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -1,242 +0,0 @@
package statsd
import (
"bytes"
"math/rand"
"strconv"
"sync"
"time"
)
var bufPool = &sync.Pool{New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 128))
}}
func getBuffer() *bytes.Buffer {
buf := bufPool.Get().(*bytes.Buffer)
return buf
}
func putBuffer(buf *bytes.Buffer) {
buf.Reset()
bufPool.Put(buf)
return
}
type Statter interface {
Inc(string, int64, float32) error
Dec(string, int64, float32) error
Gauge(string, int64, float32) error
GaugeDelta(string, int64, float32) error
Timing(string, int64, float32) error
TimingDuration(string, time.Duration, float32) error
Set(string, string, float32) error
SetInt(string, int64, float32) error
Raw(string, string, float32) error
SetPrefix(string)
Close() error
}
type Client struct {
// prefix for statsd name
prefix string
// packet sender
sender Sender
}
// Close closes the connection and cleans up.
func (s *Client) Close() error {
if s == nil {
return nil
}
err := s.sender.Close()
return err
}
// Increments a statsd count type.
// stat is a string name for the metric.
// value is the integer value
// rate is the sample rate (0.0 to 1.0)
func (s *Client) Inc(stat string, value int64, rate float32) error {
if !s.includeStat(rate) {
return nil
}
dap := strconv.FormatInt(value, 10)
return s.submit(stat, dap, "|c", rate)
}
// Decrements a statsd count type.
// stat is a string name for the metric.
// value is the integer value.
// rate is the sample rate (0.0 to 1.0).
func (s *Client) Dec(stat string, value int64, rate float32) error {
if !s.includeStat(rate) {
return nil
}
dap := strconv.FormatInt(-value, 10)
return s.submit(stat, dap, "|c", rate)
}
// Submits/Updates a statsd gauge type.
// stat is a string name for the metric.
// value is the integer value.
// rate is the sample rate (0.0 to 1.0).
func (s *Client) Gauge(stat string, value int64, rate float32) error {
if !s.includeStat(rate) {
return nil
}
dap := strconv.FormatInt(value, 10)
return s.submit(stat, dap, "|g", rate)
}
// Submits a delta to a statsd gauge.
// stat is the string name for the metric.
// value is the (positive or negative) change.
// rate is the sample rate (0.0 to 1.0).
func (s *Client) GaugeDelta(stat string, value int64, rate float32) error {
if !s.includeStat(rate) {
return nil
}
prefix := ""
if value >= 0 {
prefix = "+"
}
dap := prefix + strconv.FormatInt(value, 10)
return s.submit(stat, dap, "|g", rate)
}
// Submits a statsd timing type.
// stat is a string name for the metric.
// delta is the time duration value in milliseconds
// rate is the sample rate (0.0 to 1.0).
func (s *Client) Timing(stat string, delta int64, rate float32) error {
if !s.includeStat(rate) {
return nil
}
dap := strconv.FormatInt(delta, 10)
return s.submit(stat, dap, "|ms", rate)
}
// Submits a statsd timing type.
// stat is a string name for the metric.
// delta is the timing value as time.Duration
// rate is the sample rate (0.0 to 1.0).
func (s *Client) TimingDuration(stat string, delta time.Duration, rate float32) error {
if !s.includeStat(rate) {
return nil
}
ms := float64(delta) / float64(time.Millisecond)
//dap := fmt.Sprintf("%.02f|ms", ms)
dap := strconv.FormatFloat(ms, 'f', -1, 64)
return s.submit(stat, dap, "|ms", rate)
}
// Submits a stats set type
// stat is a string name for the metric.
// value is the string value
// rate is the sample rate (0.0 to 1.0).
func (s *Client) Set(stat string, value string, rate float32) error {
if !s.includeStat(rate) {
return nil
}
return s.submit(stat, value, "|s", rate)
}
// Submits a number as a stats set type.
// stat is a string name for the metric.
// value is the integer value
// rate is the sample rate (0.0 to 1.0).
func (s *Client) SetInt(stat string, value int64, rate float32) error {
if !s.includeStat(rate) {
return nil
}
dap := strconv.FormatInt(value, 10)
return s.submit(stat, dap, "|s", rate)
}
// Raw submits a preformatted value.
// stat is the string name for the metric.
// value is a preformatted "raw" value string.
// rate is the sample rate (0.0 to 1.0).
func (s *Client) Raw(stat string, value string, rate float32) error {
if !s.includeStat(rate) {
return nil
}
return s.submit(stat, value, "", rate)
}
// submit an already sampled raw stat
func (s *Client) submit(stat, value, suffix string, rate float32) error {
if s == nil {
return nil
}
data := getBuffer()
defer putBuffer(data)
if s.prefix != "" {
data.WriteString(s.prefix)
data.WriteString(".")
}
data.WriteString(stat)
data.WriteString(":")
data.WriteString(value)
if suffix != "" {
data.WriteString(suffix)
}
if rate < 1 {
data.WriteString("|@")
data.WriteString(strconv.FormatFloat(float64(rate), 'f', 6, 32))
}
_, err := s.sender.Send(data.Bytes())
return err
}
// check for nil client, and perform sampling calculation
func (s *Client) includeStat(rate float32) bool {
if s == nil {
return false
}
if rate < 1 {
if rand.Float32() < rate {
return true
}
return false
}
return true
}
// Sets/Updates the statsd client prefix.
func (s *Client) SetPrefix(prefix string) {
if s == nil {
return
}
s.prefix = prefix
}
// Returns a pointer to a new Client, and an error.
//
// addr is a string of the format "hostname:port", and must be parsable by
// net.ResolveUDPAddr.
//
// prefix is the statsd client prefix. Can be "" if no prefix is desired.
func NewClient(addr, prefix string) (Statter, error) {
sender, err := NewSimpleSender(addr)
if err != nil {
return nil, err
}
client := &Client{
prefix: prefix,
sender: sender,
}
return client, nil
}
// Compatibility alias
var Dial = New
var New = NewClient

View File

@ -1,42 +0,0 @@
package statsd
import "time"
// Return a new BufferedClient
//
// addr is a string of the format "hostname:port", and must be parsable by
// net.ResolveUDPAddr.
//
// prefix is the statsd client prefix. Can be "" if no prefix is desired.
//
// flushInterval is a time.Duration, and specifies the maximum interval for
// packet sending. Note that if you send lots of metrics, you will send more
// often. This is just a maximal threshold.
//
// flushBytes specifies the maximum udp packet size you wish to send. If adding
// a metric would result in a larger packet than flushBytes, the packet will
// first be send, then the new data will be added to the next packet.
//
// If flushBytes is 0, defaults to 1432 bytes, which is considered safe
// for local traffic. If sending over the public internet, 512 bytes is
// the recommended value.
func NewBufferedClient(addr, prefix string, flushInterval time.Duration, flushBytes int) (Statter, error) {
if flushBytes <= 0 {
// https://github.com/etsy/statsd/blob/master/docs/metric_types.md#multi-metric-packets
flushBytes = 1432
}
if flushInterval <= time.Duration(0) {
flushInterval = 300 * time.Millisecond
}
sender, err := NewBufferedSender(addr, flushInterval, flushBytes)
if err != nil {
return nil, err
}
client := &Client{
prefix: prefix,
sender: sender,
}
return client, nil
}

View File

@ -1,104 +0,0 @@
package statsd
import "time"
type NoopClient struct {
// prefix for statsd name
prefix string
}
// Close closes the connection and cleans up.
func (s *NoopClient) Close() error {
return nil
}
// Increments a statsd count type.
// stat is a string name for the metric.
// value is the integer value
// rate is the sample rate (0.0 to 1.0)
func (s *NoopClient) Inc(stat string, value int64, rate float32) error {
return nil
}
// Decrements a statsd count type.
// stat is a string name for the metric.
// value is the integer value.
// rate is the sample rate (0.0 to 1.0).
func (s *NoopClient) Dec(stat string, value int64, rate float32) error {
return nil
}
// Submits/Updates a statsd gauge type.
// stat is a string name for the metric.
// value is the integer value.
// rate is the sample rate (0.0 to 1.0).
func (s *NoopClient) Gauge(stat string, value int64, rate float32) error {
return nil
}
// Submits a delta to a statsd gauge.
// stat is the string name for the metric.
// value is the (positive or negative) change.
// rate is the sample rate (0.0 to 1.0).
func (s *NoopClient) GaugeDelta(stat string, value int64, rate float32) error {
return nil
}
// Submits a statsd timing type.
// stat is a string name for the metric.
// delta is the time duration value in milliseconds
// rate is the sample rate (0.0 to 1.0).
func (s *NoopClient) Timing(stat string, delta int64, rate float32) error {
return nil
}
// Submits a statsd timing type.
// stat is a string name for the metric.
// delta is the timing value as time.Duration
// rate is the sample rate (0.0 to 1.0).
func (s *NoopClient) TimingDuration(stat string, delta time.Duration, rate float32) error {
return nil
}
// Submits a stats set type.
// stat is a string name for the metric.
// value is the string value
// rate is the sample rate (0.0 to 1.0).
func (s *NoopClient) Set(stat string, value string, rate float32) error {
return nil
}
// Submits a number as a stats set type.
// convenience method for Set with number.
// stat is a string name for the metric.
// value is the integer value
// rate is the sample rate (0.0 to 1.0).
func (s *NoopClient) SetInt(stat string, value int64, rate float32) error {
return nil
}
// Raw formats the statsd event data, handles sampling, prepares it,
// and sends it to the server.
// stat is the string name for the metric.
// value is the preformatted "raw" value string.
// rate is the sample rate (0.0 to 1.0).
func (s *NoopClient) Raw(stat string, value string, rate float32) error {
return nil
}
// Sets/Updates the statsd client prefix
func (s *NoopClient) SetPrefix(prefix string) {
s.prefix = prefix
}
// Returns a pointer to a new NoopClient, and an error (always nil, just
// supplied to support api convention).
// Use variadic arguments to support identical format as NewClient, or a more
// conventional no argument form.
func NewNoopClient(a ...interface{}) (Statter, error) {
noopClient := &NoopClient{}
return noopClient, nil
}
// Compatibility alias
var NewNoop = NewNoopClient

View File

@ -1,25 +0,0 @@
/*
Package statsd provides a StatsD client implementation that is safe for
concurrent use by multiple goroutines and for efficiency can be created and
reused.
Example usage:
// first create a client
client, err := statsd.NewClient("127.0.0.1:8125", "test-client")
// handle any errors
if err != nil {
log.Fatal(err)
}
// make sure to clean up
defer client.Close()
// Send a stat
err = client.Inc("stat1", 42, 1.0)
// handle any errors
if err != nil {
log.Printf("Error sending metric: %+v", err)
}
*/
package statsd

View File

@ -1,62 +0,0 @@
package statsd
import (
"errors"
"net"
)
type Sender interface {
Send(data []byte) (int, error)
Close() error
}
// SimpleSender provides a socket send interface.
type SimpleSender struct {
// underlying connection
c net.PacketConn
// resolved udp address
ra *net.UDPAddr
}
// Send sends the data to the server endpoint.
func (s *SimpleSender) Send(data []byte) (int, error) {
// no need for locking here, as the underlying fdNet
// already serialized writes
n, err := s.c.(*net.UDPConn).WriteToUDP(data, s.ra)
if err != nil {
return 0, err
}
if n == 0 {
return n, errors.New("Wrote no bytes")
}
return n, nil
}
// Closes SimpleSender
func (s *SimpleSender) Close() error {
err := s.c.Close()
return err
}
// Returns a new SimpleSender for sending to the supplied addresss.
//
// addr is a string of the format "hostname:port", and must be parsable by
// net.ResolveUDPAddr.
func NewSimpleSender(addr string) (Sender, error) {
c, err := net.ListenPacket("udp", ":0")
if err != nil {
return nil, err
}
ra, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
sender := &SimpleSender{
c: c,
ra: ra,
}
return sender, nil
}

View File

@ -1,158 +0,0 @@
package statsd
import (
"bytes"
"fmt"
"sync"
"time"
)
// BufferedSender provides a buffered statsd udp, sending multiple
// metrics, where possible.
type BufferedSender struct {
flushBytes int
flushInterval time.Duration
sender Sender
buffer *bytes.Buffer
reqs chan []byte
shutdown chan chan error
running bool
mx sync.RWMutex
}
// Send bytes.
func (s *BufferedSender) Send(data []byte) (int, error) {
s.mx.RLock()
defer s.mx.RUnlock()
if !s.running {
return 0, fmt.Errorf("BufferedSender is not running")
}
// copy bytes, because the caller may mutate the slice (and the underlying
// array) after we return, since we may not end up sending right away.
c := make([]byte, len(data))
dlen := copy(c, data)
s.reqs <- c
return dlen, nil
}
// Close Buffered Sender
func (s *BufferedSender) Close() error {
// only need really read lock to see if we are currently
// running or not
s.mx.RLock()
if !s.running {
s.mx.RUnlock()
return nil
}
s.mx.RUnlock()
// since we are running, write lock during cleanup
s.mx.Lock()
defer s.mx.Unlock()
errChan := make(chan error)
s.running = false
s.shutdown <- errChan
return <-errChan
}
// Start Buffered Sender
// Begins ticker and read loop
func (s *BufferedSender) Start() {
// read lock to see if we are running
s.mx.RLock()
if s.running {
s.mx.RUnlock()
return
}
s.mx.RUnlock()
// write lock to start running
s.mx.Lock()
defer s.mx.Unlock()
s.running = true
s.reqs = make(chan []byte, 8)
go s.run()
}
func (s *BufferedSender) run() {
ticker := time.NewTicker(s.flushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if s.buffer.Len() > 0 {
s.flush()
}
case req := <-s.reqs:
// StatsD supports receiving multiple metrics in a single packet by
// separating them with a newline.
if s.buffer.Len()+len(req)+1 > s.flushBytes {
s.flush()
}
s.buffer.Write(req)
s.buffer.WriteByte('\n')
// if we happen to fill up the buffer, just flush right away
// instead of waiting for next input.
if s.buffer.Len() >= s.flushBytes {
s.flush()
}
case errChan := <-s.shutdown:
close(s.reqs)
for req := range s.reqs {
if s.buffer.Len()+len(req)+1 > s.flushBytes {
s.flush()
}
s.buffer.Write(req)
s.buffer.WriteByte('\n')
}
if s.buffer.Len() > 0 {
s.flush()
}
errChan <- s.sender.Close()
return
}
}
}
// flush the buffer/send to remove endpoint.
func (s *BufferedSender) flush() (int, error) {
n, err := s.sender.Send(bytes.TrimSuffix(s.buffer.Bytes(), []byte("\n")))
s.buffer.Reset() // clear the buffer
return n, err
}
// Returns a new BufferedSender
//
// addr is a string of the format "hostname:port", and must be parsable by
// net.ResolveUDPAddr.
//
// flushInterval is a time.Duration, and specifies the maximum interval for
// packet sending. Note that if you send lots of metrics, you will send more
// often. This is just a maximal threshold.
//
// flushBytes specifies the maximum udp packet size you wish to send. If adding
// a metric would result in a larger packet than flushBytes, the packet will
// first be send, then the new data will be added to the next packet.
func NewBufferedSender(addr string, flushInterval time.Duration, flushBytes int) (Sender, error) {
simpleSender, err := NewSimpleSender(addr)
if err != nil {
return nil, err
}
sender := &BufferedSender{
flushBytes: flushBytes,
flushInterval: flushInterval,
sender: simpleSender,
buffer: bytes.NewBuffer(make([]byte, 0, flushBytes)),
shutdown: make(chan chan error),
}
sender.Start()
return sender, nil
}