support uploading generated blocks to object storage (#42)

* support uploading generated blocks to object storage

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

* lint

Signed-off-by: Ben Ye <benye@amazon.com>

* update test

Signed-off-by: Ben Ye <benye@amazon.com>

* update go mod

Signed-off-by: Ben Ye <benye@amazon.com>

Signed-off-by: Ben Ye <benye@amazon.com>
This commit is contained in:
Ben Ye 2022-11-29 08:30:41 -08:00 committed by GitHub
parent a337de5226
commit aac3b58023
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 944 additions and 872 deletions

View File

@ -1,3 +1,3 @@
(github.com/go-kit/kit/log.Logger).Log
(github.com/go-kit/log.Logger).Log
fmt.Fprintln
fmt.Fprint

View File

@ -11,13 +11,18 @@ import (
"strings"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
"github.com/pkg/errors"
promModel "github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanosbench/pkg/blockgen"
"gopkg.in/alecthomas/kingpin.v2"
@ -43,7 +48,8 @@ func registerBlock(m map[string]setupFunc, app *kingpin.Application) {
}
func registerBlockGen(m map[string]setupFunc, root *kingpin.CmdClause) {
cmd := root.Command("gen", "Generates Prometheus/Thanos TSDB blocks from input. Expects []blockgen.BlockSpec in YAML format as input.")
config := extflag.RegisterPathOrContent(cmd, "config", "YAML for []blockgen.BlockSpec. Leave this empty in order to be able to pass this through STDIN", false)
config := extflag.RegisterPathOrContent(cmd, "config", "YAML for []blockgen.BlockSpec. Leave this empty in order to be able to pass this through STDIN", extflag.WithEnvSubstitution())
objStore := *extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
outputDir := cmd.Flag("output.dir", "Output directory for generated data.").Required().String()
workers := cmd.Flag("workers", "Number of go routines for block generation. If 0, 2*runtime.GOMAXPROCS(0) is used.").Int()
m["block gen"] = func(g *run.Group, logger log.Logger) error {
@ -59,6 +65,25 @@ func registerBlockGen(m map[string]setupFunc, root *kingpin.CmdClause) {
return err
}
objStoreContentYaml, err := objStore.Content()
if err != nil {
return errors.Wrap(err, "getting object store config")
}
var (
upload bool
bkt objstore.InstrumentedBucket
)
if len(objStoreContentYaml) == 0 {
level.Info(logger).Log("msg", "no supported bucket was configured, uploads will be disabled")
} else {
upload = true
bkt, err = client.NewBucket(logger, objStoreContentYaml, nil, "blockgen")
if err != nil {
return err
}
}
n := 0
if len(cfg) > 0 {
bs := []blockgen.BlockSpec{}
@ -72,8 +97,16 @@ func registerBlockGen(m map[string]setupFunc, root *kingpin.CmdClause) {
return errors.Wrap(err, "generate")
}
n++
level.Info(logger).Log("msg", "generated block", "path", path.Join(*outputDir, id.String()), "count", n)
blockDir := path.Join(*outputDir, id.String())
level.Info(logger).Log("msg", "generated block", "path", blockDir, "count", n)
runtime.GC()
if upload {
if err := block.Upload(ctx, logger, bkt, blockDir, metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload block %s", id)
}
level.Info(logger).Log("msg", "uploaded block to object storage", "path", blockDir)
}
}
return ctx.Err()
}
@ -97,8 +130,16 @@ func registerBlockGen(m map[string]setupFunc, root *kingpin.CmdClause) {
return errors.Wrap(err, "generate")
}
n++
level.Info(logger).Log("msg", "generated block", "path", path.Join(*outputDir, id.String()), "count", n)
blockDir := path.Join(*outputDir, id.String())
level.Info(logger).Log("msg", "generated block", "path", blockDir, "count", n)
runtime.GC()
if upload {
if err := block.Upload(ctx, logger, bkt, blockDir, metadata.NoneFunc); err != nil {
return errors.Wrapf(err, "upload block %s", id)
}
level.Info(logger).Log("msg", "uploaded block to object storage", "path", blockDir)
}
}
return ctx.Err()
}, func(error) { cancel() })

View File

@ -8,8 +8,8 @@ import (
"runtime"
"syscall"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
"github.com/pkg/errors"
"github.com/prometheus/common/version"

View File

@ -6,18 +6,18 @@ import (
"io"
"math"
"math/rand"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/alecthomas/kingpin.v2"
)
@ -38,7 +38,7 @@ func registerStress(m map[string]setupFunc, app *kingpin.Application) {
m["stress"] = func(g *run.Group, logger log.Logger) error {
mainCtx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
conn, err := grpc.Dial((*target).String(), grpc.WithInsecure())
conn, err := grpc.Dial((*target).String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return err
}

View File

@ -3,9 +3,9 @@ package main
import (
"os"
"github.com/go-kit/kit/log"
extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
"github.com/oklog/run"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanosbench/pkg/walgen"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v2"
@ -13,7 +13,7 @@ import (
func registerWalgen(m map[string]setupFunc, app *kingpin.Application) {
cmd := app.Command("walgen", "Generates TSDB data into WAL files.")
config := extflag.RegisterPathOrContent(cmd, "config", "YAML for series config. See walgen.Config for the format.", true)
config := extflag.RegisterPathOrContent(cmd, "config", "YAML for series config. See walgen.Config for the format.", extflag.WithRequired(), extflag.WithEnvSubstitution())
outputDir := cmd.Flag("output.dir", "Output directory for generated TSDB data.").Required().String()

View File

@ -375,7 +375,7 @@ func GenPrometheus(gen *mimic.Generator, opts PrometheusOpts) {
},
ImagePullPolicy: corev1.PullAlways,
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(int(httpPort)),
Path: "-/ready",
@ -384,7 +384,7 @@ func GenPrometheus(gen *mimic.Generator, opts PrometheusOpts) {
SuccessThreshold: 3,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/-/healthy",
Port: intstr.FromInt(9090),
@ -441,7 +441,7 @@ func GenPrometheus(gen *mimic.Generator, opts PrometheusOpts) {
},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(int(httpSidecarPort)),
Path: "metrics",

View File

@ -117,7 +117,7 @@ func GenThanosQuerier(gen *mimic.Generator, opts QuerierOpts) {
}},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(httpPort),
Path: func() string {
@ -133,7 +133,7 @@ func GenThanosQuerier(gen *mimic.Generator, opts QuerierOpts) {
FailureThreshold: 3,
},
LivenessProbe: &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/-/healthy",
Port: intstr.FromInt(httpPort),

View File

@ -3,8 +3,8 @@ package k8s
import (
"time"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanosbench/pkg/blockgen"

View File

@ -105,7 +105,7 @@ func GenThanosStoreGateway(gen *mimic.Generator, opts StoreGatewayOpts) {
}},
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(httpPort),
Path: func() string {
@ -122,7 +122,7 @@ func GenThanosStoreGateway(gen *mimic.Generator, opts StoreGatewayOpts) {
FailureThreshold: 3,
},
//LivenessProbe: &corev1.Probe{
// Handler: corev1.Handler{
// ProbeHandler: corev1.ProbeHandler{
// HTTPGet: &corev1.HTTPGetAction{
// Path: "/-/healthy",
// Port: intstr.FromInt(httpPort),

161
go.mod
View File

@ -2,73 +2,148 @@ module github.com/thanos-io/thanosbench
require (
github.com/bwplotka/mimic v0.0.0-20190730202618-06ab9976e8ef
github.com/cespare/xxhash/v2 v2.1.1
github.com/fatih/structtag v1.1.0
github.com/go-kit/kit v0.10.0
github.com/go-openapi/swag v0.19.9
github.com/cespare/xxhash/v2 v2.1.2
github.com/efficientgo/tools/extkingpin v0.0.0-20220801101838-3312908f6a9d
github.com/fatih/structtag v1.2.0
github.com/go-kit/log v0.2.1
github.com/go-openapi/swag v0.21.1
github.com/oklog/run v1.1.0
github.com/oklog/ulid v1.3.1
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.11.1
github.com/prometheus/prometheus v1.8.2-0.20200811193703-869f1bc587e6
github.com/thanos-io/thanos v0.15.0
go.uber.org/automaxprocs v1.2.0
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
google.golang.org/grpc v1.30.0
github.com/prometheus/common v0.37.0
github.com/prometheus/prometheus v0.38.0
github.com/thanos-io/objstore v0.0.0-20221006135717-79dcec7fe604
github.com/thanos-io/thanos v0.28.1
go.uber.org/automaxprocs v1.5.1
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
google.golang.org/grpc v1.48.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/yaml.v2 v2.3.0
k8s.io/api v0.18.6
k8s.io/apimachinery v0.18.6
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.24.3
k8s.io/apimachinery v0.24.3
)
require (
cloud.google.com/go v0.102.0 // indirect
cloud.google.com/go/compute v1.7.0 // indirect
cloud.google.com/go/iam v0.3.0 // indirect
cloud.google.com/go/storage v1.22.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.4.1 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible // indirect
github.com/aws/aws-sdk-go v1.44.72 // indirect
github.com/aws/aws-sdk-go-v2 v1.16.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.15.1 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.1 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.8 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.1 // indirect
github.com/aws/smithy-go v1.11.1 // indirect
github.com/baidubce/bce-sdk-go v0.9.111 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/clbanning/mxj v1.8.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/edsrzf/mmap-go v1.1.0 // indirect
github.com/efficientgo/tools/core v0.0.0-20220817170617-6c25e3b627dd // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logfmt/logfmt v0.5.0 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/gofuzz v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.1+incompatible // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.4.0 // indirect
github.com/googleapis/go-type-adapters v1.0.0 // indirect
github.com/grafana/regexp v0.0.0-20220304095617-2e8d9baf4ac2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/mailru/easyjson v0.7.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/cpuid/v2 v2.1.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/minio-go/v7 v7.0.37 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mozillazg/go-httpheader v0.2.1 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.7.1 // indirect
github.com/oracle/oci-go-sdk/v65 v65.13.0 // indirect
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.13.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/procfs v0.1.3 // indirect
github.com/prometheus/common/sigv4 v0.1.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rodaine/hclencoder v0.0.0-20190213202847-fb9757bb536e // indirect
go.uber.org/atomic v1.6.0 // indirect
go.uber.org/goleak v1.1.0 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/tools v0.0.0-20200725200936-102e7d357031 // indirect
google.golang.org/genproto v0.0.0-20200724131911-43cab4749ae7 // indirect
google.golang.org/protobuf v1.24.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/stretchr/testify v1.8.0 // indirect
github.com/tencentyun/cos-go-sdk-v5 v0.7.34 // indirect
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.9.0 // indirect
go.opentelemetry.io/otel/trace v1.9.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced // indirect
golang.org/x/oauth2 v0.0.0-20220808172628-8227340efae7 // indirect
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220722155302-e5dcc9cfc0b9 // indirect
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect
google.golang.org/api v0.91.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220808204814-fd01256a5276 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
k8s.io/klog v1.0.0 // indirect
gopkg.in/ini.v1 v1.66.6 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.70.0 // indirect
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 // indirect
sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
)
// We want to replace the client-go version with a specific commit hash,
// so that we don't get errors about being incompatible with the Go proxies.
// See https://github.com/thanos-io/thanos/issues/1415
replace (
k8s.io/api => k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.0.0-20190620085554-14e95df34f1f
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go => k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
k8s.io/code-generator => k8s.io/code-generator v0.0.0-20190612205613-18da4a14b22b
k8s.io/klog => k8s.io/klog v0.3.1
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30
// Overriding to use latest commit.
gopkg.in/alecthomas/kingpin.v2 => github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497
// From Prometheus.
k8s.io/klog => github.com/simonpasquier/klog-gokit v0.3.0
k8s.io/klog/v2 => github.com/simonpasquier/klog-gokit/v3 v3.0.0
)
go 1.18

1467
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -10,9 +10,9 @@ import (
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"github.com/go-kit/kit/log"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanosbench/pkg/seriesgen"
@ -92,12 +92,12 @@ func Generate(ctx context.Context, logger log.Logger, goroutines int, dir string
}
bdir := path.Join(dir, id.String())
meta, err := metadata.Read(bdir)
meta, err := metadata.ReadFromDir(bdir)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "meta read")
}
meta.Thanos = block.Thanos
if err := metadata.Write(logger, bdir, meta); err != nil {
if err := meta.WriteToDir(logger, bdir); err != nil {
return ulid.ULID{}, errors.Wrap(err, "meta write")
}
return id, nil

View File

@ -5,9 +5,8 @@ import (
"fmt"
"time"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/model"

View File

@ -8,10 +8,10 @@ import (
"github.com/oklog/ulid"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
@ -97,7 +97,9 @@ func (w *BlockWriter) initHeadAndAppender() error {
// var w *wal.WAL = nil
// Put huge chunkRange; It has to be equal then expected block size.
// Since we don't have info about block size here, set it to large number.
h, err := tsdb.NewHead(nil, logger, nil, durToMilis(9999*time.Hour), "", nil, tsdb.DefaultStripeSize, nil)
opts := tsdb.DefaultHeadOptions()
opts.ChunkRange = durToMilis(9999 * time.Hour)
h, err := tsdb.NewHead(nil, logger, nil, opts, nil)
if err != nil {
return errors.Wrap(err, "tsdb.NewHead")
}
@ -132,7 +134,9 @@ func (w *BlockWriter) writeHeadToDisk() (ulid.ULID, error) {
nil,
w.logger,
[]int64{durToMilis(2 * time.Hour)}, // Does not matter, used only for planning.
chunkenc.NewPool())
chunkenc.NewPool(),
nil,
)
if err != nil {
return ulid.ULID{}, errors.Wrap(err, "create leveled compactor")
}

View File

@ -36,7 +36,7 @@ func Append(ctx context.Context, goroutines int, appendable storage.Appendable,
}
}
ref := uint64(0)
ref := storage.SeriesRef(0)
iter := s.Iterator()
for iter.Next() {
@ -44,24 +44,13 @@ func Append(ctx context.Context, goroutines int, appendable storage.Appendable,
return gctx.Err()
}
t, v := iter.At()
if ref == 0 {
ref, err = app.Add(s.Labels(), t, v)
if err != nil {
if rerr := app.Rollback(); rerr != nil {
err = errors.Wrapf(err, "rollback failed: %v", rerr)
}
return errors.Wrap(err, "add sample")
}
continue
}
if err = app.AddFast(ref, t, v); err != nil {
ref, err = app.Append(ref, s.Labels(), t, v)
if err != nil {
if rerr := app.Rollback(); rerr != nil {
err = errors.Wrapf(err, "rollback failed: %v", rerr)
}
return errors.Wrap(err, "add fast sample")
return errors.Wrap(err, "add sample")
}
}

View File

@ -9,8 +9,10 @@ import (
"testing"
"time"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/thanos-io/thanos/pkg/testutil"
)
@ -50,17 +52,24 @@ type testAppendable struct {
samples map[uint64][]sample
}
func (a *testAppendable) Add(l labels.Labels, t int64, v float64) (uint64, error) {
ref := l.Hash()
return ref, a.AddFast(ref, t, v)
}
func (a *testAppendable) AddFast(ref uint64, t int64, v float64) error {
func (a *testAppendable) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
hash := uint64(ref)
if hash == 0 {
hash = l.Hash()
}
a.mtx.Lock()
defer a.mtx.Unlock()
a.samples[ref] = append(a.samples[ref], sample{T: t, V: v})
return nil
a.samples[hash] = append(a.samples[hash], sample{T: t, V: v})
return storage.SeriesRef(hash), nil
}
func (a *testAppendable) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
return 0, nil
}
func (a *testAppendable) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
return 0, nil
}
func (a *testAppendable) Commit() error {

View File

@ -4,7 +4,7 @@ import (
"math/rand"
"time"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/model/labels"
)
type sample struct {

View File

@ -9,12 +9,12 @@ import (
"strings"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanosbench/pkg/seriesgen"
)
@ -56,7 +56,7 @@ func GenerateTSDBWAL(logger log.Logger, dir string, config Config) error {
MaxBlockDuration: maxBlockDuration.Milliseconds(),
RetentionDuration: config.Retention.Milliseconds(),
NoLockfile: true,
})
}, nil)
if err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)

View File

@ -8,8 +8,8 @@ import (
"reflect"
"github.com/fatih/structtag"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/thanos-io/thanosbench/pkg/blockgen"