Restore Dogstatsd exporter (#10)

* Restore Dogstatsd exporter

* Lower MTU

* Split array values across packets

* Lower MaxPacketSize to 1432
This commit is contained in:
Joshua MacDonald 2020-04-14 10:52:14 -07:00 committed by GitHub
parent 580248e374
commit ea24ca8575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1106 additions and 0 deletions

View File

@ -0,0 +1,109 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dogstatsd // import "github.com/open-telemetry/opentelemetry-go-contrib/exporters/metric/dogstatsd"
import (
"bytes"
"time"
"github.com/open-telemetry/opentelemetry-go-contrib/exporters/metric/dogstatsd/internal/statsd"
"go.opentelemetry.io/otel/api/global"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
)
type (
Config = statsd.Config
// Exporter implements a dogstatsd-format statsd exporter,
// which encodes label sets as independent fields in the
// output.
//
// TODO: find a link for this syntax. It's been copied out of
// code, not a specification:
//
// https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go
Exporter struct {
*statsd.Exporter
labelEncoder *statsd.LabelEncoder
}
)
var (
_ export.Exporter = &Exporter{}
)
// NewRawExporter returns a new Dogstatsd-syntax exporter for use in a pipeline.
func NewRawExporter(config Config) (*Exporter, error) {
exp := &Exporter{
labelEncoder: statsd.NewLabelEncoder(),
}
var err error
exp.Exporter, err = statsd.NewExporter(config, exp)
return exp, err
}
// InstallNewPipeline instantiates a NewExportPipeline and registers it globally.
// Typically called as:
//
// pipeline, err := dogstatsd.InstallNewPipeline(dogstatsd.Config{...})
// if err != nil {
// ...
// }
// defer pipeline.Stop()
// ... Done
func InstallNewPipeline(config Config) (*push.Controller, error) {
controller, err := NewExportPipeline(config, time.Minute)
if err != nil {
return controller, err
}
global.SetMeterProvider(controller)
return controller, err
}
// NewExportPipeline sets up a complete export pipeline with the recommended setup,
// chaining a NewRawExporter into the recommended selectors and batchers.
func NewExportPipeline(config Config, period time.Duration) (*push.Controller, error) {
selector := simple.NewWithExactMeasure()
exporter, err := NewRawExporter(config)
if err != nil {
return nil, err
}
// The ungrouped batcher ensures that the export sees the full
// set of labels as dogstatsd tags.
batcher := ungrouped.New(selector, exporter.labelEncoder, false)
pusher := push.New(batcher, exporter, period)
pusher.Start()
return pusher, nil
}
// AppendName is part of the stats-internal adapter interface.
func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
}
// AppendTags is part of the stats-internal adapter interface.
func (e *Exporter) AppendTags(rec export.Record, buf *bytes.Buffer) {
encoded := rec.Labels().Encoded(e.labelEncoder)
_, _ = buf.WriteString(encoded)
}

View File

@ -0,0 +1,58 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dogstatsd_test
import (
"bytes"
"context"
"testing"
"github.com/open-telemetry/opentelemetry-go-contrib/exporters/metric/dogstatsd"
"github.com/open-telemetry/opentelemetry-go-contrib/exporters/metric/dogstatsd/internal/statsd"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/exporters/metric/test"
"go.opentelemetry.io/otel/sdk/metric/aggregator/sum"
)
// TestDogstatsLabels that labels are formatted in the correct style,
// whether or not the provided labels were encoded by a statsd label
// encoder.
func TestDogstatsLabels(t *testing.T) {
encoder := statsd.NewLabelEncoder()
ctx := context.Background()
checkpointSet := test.NewCheckpointSet(encoder)
desc := metric.NewDescriptor("test.name", metric.CounterKind, core.Int64NumberKind)
cagg := sum.New()
_ = cagg.Update(ctx, core.NewInt64Number(123), &desc)
cagg.Checkpoint(ctx, &desc)
checkpointSet.Add(&desc, cagg, key.New("A").String("B"))
var buf bytes.Buffer
exp, err := dogstatsd.NewRawExporter(dogstatsd.Config{
Writer: &buf,
})
require.Nil(t, err)
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
require.Equal(t, "test.name:123|c|#A:B\n", buf.String())
}

View File

@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dogstatsd_test
import (
"context"
"fmt"
"io"
"log"
"sync"
"time"
"github.com/open-telemetry/opentelemetry-go-contrib/exporters/metric/dogstatsd"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
)
func ExampleNew() {
// Create a "server"
wg := &sync.WaitGroup{}
wg.Add(1)
reader, writer := io.Pipe()
go func() {
defer wg.Done()
for {
var buf [4096]byte
n, err := reader.Read(buf[:])
if err == io.EOF {
return
} else if err != nil {
log.Fatal("Read err: ", err)
} else if n >= len(buf) {
log.Fatal("Read small buffer: ", n)
} else {
fmt.Print(string(buf[0:n]))
}
}
}()
// Create a meter
pusher, err := dogstatsd.NewExportPipeline(dogstatsd.Config{
// The Writer field provides test support.
Writer: writer,
// In real code, use the URL field:
//
// URL: fmt.Sprint("unix://", path),
}, time.Minute)
if err != nil {
log.Fatal("Could not initialize dogstatsd exporter:", err)
}
ctx := context.Background()
key := key.New("key")
// pusher implements the metric.MeterProvider interface:
meter := pusher.Meter("example")
// Create and update a single counter:
counter := metric.Must(meter).NewInt64Counter("a.counter", metric.WithKeys(key))
counter.Add(ctx, 100, key.String("value"))
// Flush the exporter, close the pipe, and wait for the reader.
pusher.Stop()
writer.Close()
wg.Wait()
// Output:
// a.counter:100|c|#key:value
}

View File

@ -0,0 +1,8 @@
module github.com/open-telemetry/opentelemetry-go-contrib/exporters/metric/dogstatsd
go 1.14
require (
github.com/stretchr/testify v1.5.1
go.opentelemetry.io/otel v0.4.2
)

View File

@ -0,0 +1,78 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60=
github.com/benbjohnson/clock v1.0.0 h1:78Jk/r6m4wCi6sndMpty7A//t4dw/RW5fV4ZgDVfX1w=
github.com/benbjohnson/clock v1.0.0/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/opentracing/opentracing-go v1.1.1-0.20190913142402-a7454ce5950e/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
go.opentelemetry.io/otel v0.4.2 h1:nT+GOqqRR1cIY92xmo1DeiXLHtIlXH1KLRgnsnhuNrs=
go.opentelemetry.io/otel v0.4.2/go.mod h1:OgNpQOjrlt33Ew6Ds0mGjmcTQg/rhUctsbkRdk/g1fw=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03 h1:4HYDjxeNXAOTv3o1N2tjo8UUSlhQgAD52FVkwxnWgM8=
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -0,0 +1,310 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd
// See https://github.com/b/statsd_spec for the best-available statsd
// syntax specification. See also
// https://github.com/statsd/statsd/edit/master/docs/metric_types.md
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"strconv"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/unit"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregator"
)
type (
// Config supports common configuration that applies to statsd exporters.
Config struct {
// URL describes the destination for exporting statsd data.
// e.g., udp://host:port
// tcp://host:port
// unix:///socket/path
URL string
// Writer is an alternate to providing a URL. When Writer is
// non-nil, URL will be ignored and the exporter will write to
// the configured Writer interface.
Writer io.Writer
// MaxPacketSize this limits the packet size for packet-oriented transports.
MaxPacketSize int
// TODO support Dial and Write timeouts
}
// Exporter is common type meant to implement concrete statsd
// exporters.
Exporter struct {
adapter Adapter
config Config
conn net.Conn
writer io.Writer
buffer bytes.Buffer
}
// Adapter supports statsd syntax variations, primarily plain
// statsd vs. dogstatsd.
Adapter interface {
AppendName(export.Record, *bytes.Buffer)
AppendTags(export.Record, *bytes.Buffer)
}
)
const (
formatCounter = "c"
formatHistogram = "h"
formatGauge = "g"
formatTiming = "ms"
// MaxPacketSize defaults to the smallest value known to work
// across all cloud providers. If the packets are too large,
// you will see "write: message too long" errors.
MaxPacketSize = 1432
)
var (
_ export.Exporter = &Exporter{}
ErrInvalidScheme = fmt.Errorf("invalid statsd transport")
)
// NewExporter returns a common implementation for exporters that Export
// statsd syntax.
func NewExporter(config Config, adapter Adapter) (*Exporter, error) {
if config.MaxPacketSize <= 0 {
config.MaxPacketSize = MaxPacketSize
}
var writer io.Writer
var conn net.Conn
var err error
if config.Writer != nil {
writer = config.Writer
} else {
conn, err = dial(config.URL)
if conn != nil {
writer = conn
}
}
// TODO: If err != nil, we return it _with_ a valid exporter; the
// exporter should attempt to re-dial if it's retryable. Add a
// Start() and Stop() API.
return &Exporter{
adapter: adapter,
config: config,
conn: conn,
writer: writer,
}, err
}
// dial connects to a statsd service using several common network
// types. Presently "udp" and "unix" datagram socket connections are
// supported.
func dial(endpoint string) (net.Conn, error) {
dest, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
// TODO: Support tcp destination, need configurable timeouts first.
scheme := dest.Scheme
switch scheme {
case "udp", "udp4", "udp6":
udpAddr, err := net.ResolveUDPAddr(scheme, dest.Host)
locAddr := &net.UDPAddr{}
if err != nil {
return nil, err
}
conn, err := net.DialUDP(scheme, locAddr, udpAddr)
if err != nil {
return nil, err
}
return conn, err
case "unix", "unixgram":
scheme = "unixgram"
locAddr := &net.UnixAddr{}
sockAddr, err := net.ResolveUnixAddr(scheme, dest.Path)
if err != nil {
return nil, err
}
conn, err := net.DialUnix(scheme, locAddr, sockAddr)
if err != nil {
return nil, err
}
return conn, err
}
return nil, ErrInvalidScheme
}
// Export is common code for any statsd-based metric.Exporter implementation.
func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error {
buf := &e.buffer
buf.Reset()
var aggErr error
var sendErr error
aggErr = checkpointSet.ForEach(func(rec export.Record) error {
pts, err := e.countPoints(rec)
if err != nil {
return err
}
for pt := 0; pt < pts; pt++ {
before := buf.Len()
if err := e.formatMetric(rec, pt, buf); err != nil {
return err
}
if buf.Len() < e.config.MaxPacketSize {
continue
}
if before == 0 {
// A single metric >= packet size
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
buf.Reset()
continue
}
// Send and copy the leftover
if err := e.send(buf.Bytes()[:before]); err != nil && sendErr == nil {
sendErr = err
}
leftover := buf.Len() - before
copy(buf.Bytes()[0:leftover], buf.Bytes()[before:])
buf.Truncate(leftover)
}
return nil
})
if err := e.send(buf.Bytes()); err != nil && sendErr == nil {
sendErr = err
}
if sendErr != nil {
return sendErr
}
return aggErr
}
// send writes a complete buffer to the writer as a blocking call.
func (e *Exporter) send(buf []byte) error {
for len(buf) != 0 {
n, err := e.writer.Write(buf)
if err != nil {
return err
}
buf = buf[n:]
}
return nil
}
// countPoints returns the number of separate statsd points contained
// in this record.
func (e *Exporter) countPoints(rec export.Record) (int, error) {
agg := rec.Aggregator()
if pts, ok := agg.(aggregator.Points); ok {
points, err := pts.Points()
if err != nil {
return 0, err
}
return len(points), nil
}
return 1, nil
}
// formatMetric formats an individual export record. For some records
// this will emit a single statistic, for some it will emit more than
// one.
func (e *Exporter) formatMetric(rec export.Record, pos int, buf *bytes.Buffer) error {
desc := rec.Descriptor()
agg := rec.Aggregator()
// TODO handle non-Points Distribution/MaxSumCount by
// formatting individual quantiles, the sum, and the count as
// single statistics. For the dogstatsd variation, assuming
// open-source systems like Veneur add support, figure out the
// proper encoding for "d"-type distribution data.
if pts, ok := agg.(aggregator.Points); ok {
var format string
if desc.Unit() == unit.Milliseconds {
format = formatTiming
} else {
format = formatHistogram
}
points, err := pts.Points()
if err != nil {
return err
}
e.formatSingleStat(rec, points[pos], format, buf)
} else if sum, ok := agg.(aggregator.Sum); ok {
sum, err := sum.Sum()
if err != nil {
return err
}
e.formatSingleStat(rec, sum, formatCounter, buf)
} else if lv, ok := agg.(aggregator.LastValue); ok {
lv, _, err := lv.LastValue()
if err != nil {
return err
}
e.formatSingleStat(rec, lv, formatGauge, buf)
}
return nil
}
// formatSingleStat encodes a single item of statsd data followed by a
// newline.
func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) {
e.adapter.AppendName(rec, buf)
_, _ = buf.WriteRune(':')
writeNumber(buf, val, rec.Descriptor().NumberKind())
_, _ = buf.WriteRune('|')
_, _ = buf.WriteString(fmtStr)
e.adapter.AppendTags(rec, buf)
_, _ = buf.WriteRune('\n')
}
func writeNumber(buf *bytes.Buffer, num core.Number, kind core.NumberKind) {
var tmp [128]byte
var conv []byte
switch kind {
case core.Int64NumberKind:
conv = strconv.AppendInt(tmp[:0], num.AsInt64(), 10)
case core.Float64NumberKind:
conv = strconv.AppendFloat(tmp[:0], num.AsFloat64(), 'g', -1, 64)
case core.Uint64NumberKind:
conv = strconv.AppendUint(tmp[:0], num.AsUint64(), 10)
}
_, _ = buf.Write(conv)
}

View File

@ -0,0 +1,341 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd_test
import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"testing"
"github.com/open-telemetry/opentelemetry-go-contrib/exporters/metric/dogstatsd/internal/statsd"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/unit"
"go.opentelemetry.io/otel/exporters/metric/test"
export "go.opentelemetry.io/otel/sdk/export/metric"
)
// withTagsAdapter tests a dogstatsd-style statsd exporter.
type withTagsAdapter struct {
*statsd.LabelEncoder
}
func (*withTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
}
func (ta *withTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) {
encoded := rec.Labels().Encoded(ta.LabelEncoder)
_, _ = buf.WriteString(encoded)
}
func newWithTagsAdapter() *withTagsAdapter {
return &withTagsAdapter{
statsd.NewLabelEncoder(),
}
}
// noTagsAdapter simulates a plain-statsd exporter that appends tag
// values to the metric name.
type noTagsAdapter struct {
}
func (*noTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) {
_, _ = buf.WriteString(rec.Descriptor().Name())
iter := rec.Labels().Iter()
for iter.Next() {
tag := iter.Label()
_, _ = buf.WriteString(".")
_, _ = buf.WriteString(tag.Value.Emit())
}
}
func (*noTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) {
}
func newNoTagsAdapter() *noTagsAdapter {
return &noTagsAdapter{}
}
type testWriter struct {
vec []string
}
func (w *testWriter) Write(b []byte) (int, error) {
w.vec = append(w.vec, string(b))
return len(b), nil
}
func TestBasicFormat(t *testing.T) {
type adapterOutput struct {
adapter statsd.Adapter
expected string
}
for _, ao := range []adapterOutput{{
adapter: newWithTagsAdapter(),
expected: `counter:%s|c|#A:B,C:D
observer:%s|g|#A:B,C:D
measure:%s|h|#A:B,C:D
timer:%s|ms|#A:B,C:D
`}, {
adapter: newNoTagsAdapter(),
expected: `counter.B.D:%s|c
observer.B.D:%s|g
measure.B.D:%s|h
timer.B.D:%s|ms
`},
} {
adapter := ao.adapter
expected := ao.expected
t.Run(fmt.Sprintf("%T", adapter), func(t *testing.T) {
for _, nkind := range []core.NumberKind{
core.Float64NumberKind,
core.Int64NumberKind,
} {
t.Run(nkind.String(), func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
Writer: writer,
MaxPacketSize: 1024,
}
exp, err := statsd.NewExporter(config, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
checkpointSet := test.NewCheckpointSet(export.NewDefaultLabelEncoder())
cdesc := metric.NewDescriptor(
"counter", metric.CounterKind, nkind)
gdesc := metric.NewDescriptor(
"observer", metric.ObserverKind, nkind)
mdesc := metric.NewDescriptor(
"measure", metric.MeasureKind, nkind)
tdesc := metric.NewDescriptor(
"timer", metric.MeasureKind, nkind, metric.WithUnit(unit.Milliseconds))
labels := []core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
}
const value = 123.456
checkpointSet.AddCounter(&cdesc, value, labels...)
checkpointSet.AddLastValue(&gdesc, value, labels...)
checkpointSet.AddMeasure(&mdesc, value, labels...)
checkpointSet.AddMeasure(&tdesc, value, labels...)
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
var vfmt string
if nkind == core.Int64NumberKind {
fv := value
vfmt = strconv.FormatInt(int64(fv), 10)
} else {
vfmt = strconv.FormatFloat(value, 'g', -1, 64)
}
require.Equal(t, 1, len(writer.vec))
require.Equal(t, fmt.Sprintf(expected, vfmt, vfmt, vfmt, vfmt), writer.vec[0])
})
}
})
}
}
func makeLabels(offset, nkeys int) []core.KeyValue {
r := make([]core.KeyValue, nkeys)
for i := range r {
r[i] = key.New(fmt.Sprint("k", offset+i)).String(fmt.Sprint("v", offset+i))
}
return r
}
type splitTestCase struct {
name string
setup func(add func(int))
check func(expected, got []string, t *testing.T)
}
var splitTestCases = []splitTestCase{
// These test use the number of keys to control where packets
// are split.
{"Simple",
func(add func(int)) {
add(1)
add(1000)
add(1)
},
func(expected, got []string, t *testing.T) {
require.EqualValues(t, expected, got)
},
},
{"LastBig",
func(add func(int)) {
add(1)
add(1)
add(1000)
},
func(expected, got []string, t *testing.T) {
require.Equal(t, 2, len(got))
require.EqualValues(t, []string{
expected[0] + expected[1],
expected[2],
}, got)
},
},
{"FirstBig",
func(add func(int)) {
add(1000)
add(1)
add(1)
add(1000)
add(1)
add(1)
},
func(expected, got []string, t *testing.T) {
require.Equal(t, 4, len(got))
require.EqualValues(t, []string{
expected[0],
expected[1] + expected[2],
expected[3],
expected[4] + expected[5],
}, got)
},
},
{"OneBig",
func(add func(int)) {
add(1000)
},
func(expected, got []string, t *testing.T) {
require.EqualValues(t, expected, got)
},
},
{"LastSmall",
func(add func(int)) {
add(1000)
add(1)
},
func(expected, got []string, t *testing.T) {
require.EqualValues(t, expected, got)
},
},
{"Overflow",
func(add func(int)) {
for i := 0; i < 1000; i++ {
add(1)
}
},
func(expected, got []string, t *testing.T) {
require.Less(t, 1, len(got))
require.Equal(t, strings.Join(expected, ""), strings.Join(got, ""))
},
},
{"Empty",
func(add func(int)) {
},
func(expected, got []string, t *testing.T) {
require.Equal(t, 0, len(got))
},
},
{"AllBig",
func(add func(int)) {
add(1000)
add(1000)
add(1000)
},
func(expected, got []string, t *testing.T) {
require.EqualValues(t, expected, got)
},
},
}
func TestPacketSplit(t *testing.T) {
for _, tcase := range splitTestCases {
t.Run(tcase.name, func(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
Writer: writer,
MaxPacketSize: 1024,
}
adapter := newWithTagsAdapter()
exp, err := statsd.NewExporter(config, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
checkpointSet := test.NewCheckpointSet(adapter.LabelEncoder)
desc := metric.NewDescriptor("counter", metric.CounterKind, core.Int64NumberKind)
var expected []string
offset := 0
tcase.setup(func(nkeys int) {
labels := makeLabels(offset, nkeys)
offset += nkeys
iter := export.LabelSlice(labels).Iter()
encoded := adapter.LabelEncoder.Encode(iter)
expect := fmt.Sprint("counter:100|c", encoded, "\n")
expected = append(expected, expect)
checkpointSet.AddCounter(&desc, 100, labels...)
})
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
tcase.check(expected, writer.vec, t)
})
}
}
func TestArraySplit(t *testing.T) {
ctx := context.Background()
writer := &testWriter{}
config := statsd.Config{
Writer: writer,
MaxPacketSize: 1024,
}
adapter := newWithTagsAdapter()
exp, err := statsd.NewExporter(config, adapter)
if err != nil {
t.Fatal("New error: ", err)
}
checkpointSet := test.NewCheckpointSet(adapter.LabelEncoder)
desc := metric.NewDescriptor("measure", metric.MeasureKind, core.Int64NumberKind)
for i := 0; i < 1024; i++ {
checkpointSet.AddMeasure(&desc, 100)
}
err = exp.Export(ctx, checkpointSet)
require.Nil(t, err)
require.Greater(t, len(writer.vec), 1)
for _, result := range writer.vec {
require.LessOrEqual(t, len(result), config.MaxPacketSize)
}
}

View File

@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd
import (
"bytes"
"sync"
export "go.opentelemetry.io/otel/sdk/export/metric"
)
// LabelEncoder encodes metric labels in the dogstatsd syntax.
//
// TODO: find a link for this syntax. It's been copied out of code,
// not a specification:
//
// https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go
type LabelEncoder struct {
pool sync.Pool
}
var _ export.LabelEncoder = &LabelEncoder{}
var leID = export.NewLabelEncoderID()
// NewLabelEncoder returns a new encoder for dogstatsd-syntax metric
// labels.
func NewLabelEncoder() *LabelEncoder {
return &LabelEncoder{
pool: sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
},
}
}
// Encode emits a string like "|#key1:value1,key2:value2".
func (e *LabelEncoder) Encode(iter export.LabelIterator) string {
buf := e.pool.Get().(*bytes.Buffer)
defer e.pool.Put(buf)
buf.Reset()
delimiter := "|#"
for iter.Next() {
kv := iter.Label()
_, _ = buf.WriteString(delimiter)
_, _ = buf.WriteString(string(kv.Key))
_, _ = buf.WriteRune(':')
_, _ = buf.WriteString(kv.Value.Emit())
delimiter = ","
}
return buf.String()
}
func (*LabelEncoder) ID() int64 {
return leID
}

View File

@ -0,0 +1,45 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statsd_test
import (
"testing"
"github.com/open-telemetry/opentelemetry-go-contrib/exporters/metric/dogstatsd/internal/statsd"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
export "go.opentelemetry.io/otel/sdk/export/metric"
)
var testLabels = []core.KeyValue{
key.New("A").String("B"),
key.New("C").String("D"),
key.New("E").Float64(1.5),
}
func TestLabelSyntax(t *testing.T) {
encoder := statsd.NewLabelEncoder()
require.Equal(t, `|#A:B,C:D,E:1.5`, encoder.Encode(export.LabelSlice(testLabels).Iter()))
kvs := []core.KeyValue{
key.New("A").String("B"),
}
require.Equal(t, `|#A:B`, encoder.Encode(export.LabelSlice(kvs).Iter()))
require.Equal(t, "", encoder.Encode(export.LabelSlice(nil).Iter()))
}