mirror of https://github.com/thanos-io/thanos.git
Compare commits
13 Commits
v0.39.0-rc
...
main
Author | SHA1 | Date |
---|---|---|
|
3727363b49 | |
|
37254e5779 | |
|
4b31bbaa6b | |
|
d6ee898a06 | |
|
5a95d13802 | |
|
b54d293dbd | |
|
dfcbfe7c40 | |
|
8b738c55b1 | |
|
69624ecbf1 | |
|
0453c9b144 | |
|
9c955d21df | |
|
7de9c13e5f | |
|
34a98c8efb |
13
CHANGELOG.md
13
CHANGELOG.md
|
@ -8,7 +8,17 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
|
||||||
|
|
||||||
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)
|
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)
|
||||||
|
|
||||||
### [v0.39.0-rc.0](https://github.com/thanos-io/thanos/tree/release-0.39) - 2025 06 19
|
## Unreleased
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
## [v0.39.0](https://github.com/thanos-io/thanos/tree/release-0.39) - 2025 06 25
|
||||||
|
|
||||||
In short: there are a bunch of fixes and small improvements. The shining items in this release are memory usage improvements in Thanos Query and shuffle sharding support in Thanos Receiver. Information about shuffle sharding support is available in the documentation. Thank you to all contributors!
|
In short: there are a bunch of fixes and small improvements. The shining items in this release are memory usage improvements in Thanos Query and shuffle sharding support in Thanos Receiver. Information about shuffle sharding support is available in the documentation. Thank you to all contributors!
|
||||||
|
|
||||||
|
@ -33,7 +43,6 @@ In short: there are a bunch of fixes and small improvements. The shining items i
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
- [#8199](https://github.com/thanos-io/thanos/pull/8199) Query: handle panics or nil pointer dereference in querier gracefully when query analyze returns nil
|
- [#8199](https://github.com/thanos-io/thanos/pull/8199) Query: handle panics or nil pointer dereference in querier gracefully when query analyze returns nil
|
||||||
|
|
||||||
- [#8211](https://github.com/thanos-io/thanos/pull/8211) Query: fix panic on nested partial response in distributed instant query
|
- [#8211](https://github.com/thanos-io/thanos/pull/8211) Query: fix panic on nested partial response in distributed instant query
|
||||||
- [#8216](https://github.com/thanos-io/thanos/pull/8216) Query/Receive: fix iter race between `next()` and `stop()` introduced in https://github.com/thanos-io/thanos/pull/7821.
|
- [#8216](https://github.com/thanos-io/thanos/pull/8216) Query/Receive: fix iter race between `next()` and `stop()` introduced in https://github.com/thanos-io/thanos/pull/7821.
|
||||||
- [#8212](https://github.com/thanos-io/thanos/pull/8212) Receive: Ensure forward/replication metrics are incremented in err cases
|
- [#8212](https://github.com/thanos-io/thanos/pull/8212) Receive: Ensure forward/replication metrics are incremented in err cases
|
||||||
|
|
|
@ -112,8 +112,9 @@ type ruleConfig struct {
|
||||||
storeRateLimits store.SeriesSelectLimits
|
storeRateLimits store.SeriesSelectLimits
|
||||||
ruleConcurrentEval int64
|
ruleConcurrentEval int64
|
||||||
|
|
||||||
extendedFunctionsEnabled bool
|
extendedFunctionsEnabled bool
|
||||||
EnableFeatures []string
|
EnableFeatures []string
|
||||||
|
tsdbEnableNativeHistograms bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type Expression struct {
|
type Expression struct {
|
||||||
|
@ -170,6 +171,10 @@ func registerRule(app *extkingpin.App) {
|
||||||
cmd.Flag("query.enable-x-functions", "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine.").Default("false").BoolVar(&conf.extendedFunctionsEnabled)
|
cmd.Flag("query.enable-x-functions", "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine.").Default("false").BoolVar(&conf.extendedFunctionsEnabled)
|
||||||
cmd.Flag("enable-feature", "Comma separated feature names to enable. Valid options for now: promql-experimental-functions (enables promql experimental functions for ruler)").Default("").StringsVar(&conf.EnableFeatures)
|
cmd.Flag("enable-feature", "Comma separated feature names to enable. Valid options for now: promql-experimental-functions (enables promql experimental functions for ruler)").Default("").StringsVar(&conf.EnableFeatures)
|
||||||
|
|
||||||
|
cmd.Flag("tsdb.enable-native-histograms",
|
||||||
|
"[EXPERIMENTAL] Enables the ingestion of native histograms.").
|
||||||
|
Default("false").BoolVar(&conf.tsdbEnableNativeHistograms)
|
||||||
|
|
||||||
conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write configurations, that specify servers where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution())
|
conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write configurations, that specify servers where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution())
|
||||||
|
|
||||||
conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
|
conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
|
||||||
|
@ -189,11 +194,12 @@ func registerRule(app *extkingpin.App) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbOpts := &tsdb.Options{
|
tsdbOpts := &tsdb.Options{
|
||||||
MinBlockDuration: int64(time.Duration(*tsdbBlockDuration) / time.Millisecond),
|
MinBlockDuration: int64(time.Duration(*tsdbBlockDuration) / time.Millisecond),
|
||||||
MaxBlockDuration: int64(time.Duration(*tsdbBlockDuration) / time.Millisecond),
|
MaxBlockDuration: int64(time.Duration(*tsdbBlockDuration) / time.Millisecond),
|
||||||
RetentionDuration: int64(time.Duration(*tsdbRetention) / time.Millisecond),
|
RetentionDuration: int64(time.Duration(*tsdbRetention) / time.Millisecond),
|
||||||
NoLockfile: *noLockFile,
|
NoLockfile: *noLockFile,
|
||||||
WALCompression: wlog.ParseCompressionType(*walCompression, string(wlog.CompressionSnappy)),
|
WALCompression: wlog.ParseCompressionType(*walCompression, string(wlog.CompressionSnappy)),
|
||||||
|
EnableNativeHistograms: conf.tsdbEnableNativeHistograms,
|
||||||
}
|
}
|
||||||
|
|
||||||
agentOpts := &agent.Options{
|
agentOpts := &agent.Options{
|
||||||
|
|
|
@ -503,6 +503,9 @@ Flags:
|
||||||
options for now: promql-experimental-functions
|
options for now: promql-experimental-functions
|
||||||
(enables promql experimental functions for
|
(enables promql experimental functions for
|
||||||
ruler)
|
ruler)
|
||||||
|
--[no-]tsdb.enable-native-histograms
|
||||||
|
[EXPERIMENTAL] Enables the ingestion of native
|
||||||
|
histograms.
|
||||||
--remote-write.config-file=<file-path>
|
--remote-write.config-file=<file-path>
|
||||||
Path to YAML config for the remote-write
|
Path to YAML config for the remote-write
|
||||||
configurations, that specify servers
|
configurations, that specify servers
|
||||||
|
|
|
@ -7,11 +7,14 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
"github.com/go-kit/log/level"
|
"github.com/go-kit/log/level"
|
||||||
"github.com/oklog/ulid/v2"
|
"github.com/oklog/ulid/v2"
|
||||||
|
|
||||||
|
xsync "golang.org/x/sync/singleflight"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/thanos-io/objstore"
|
"github.com/thanos-io/objstore"
|
||||||
|
@ -49,6 +52,7 @@ type ReaderPool struct {
|
||||||
// Keep track of all readers managed by the pool.
|
// Keep track of all readers managed by the pool.
|
||||||
lazyReadersMx sync.Mutex
|
lazyReadersMx sync.Mutex
|
||||||
lazyReaders map[*LazyBinaryReader]struct{}
|
lazyReaders map[*LazyBinaryReader]struct{}
|
||||||
|
lazyReadersSF xsync.Group
|
||||||
|
|
||||||
lazyDownloadFunc LazyDownloadIndexHeaderFunc
|
lazyDownloadFunc LazyDownloadIndexHeaderFunc
|
||||||
}
|
}
|
||||||
|
@ -123,18 +127,16 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime
|
||||||
// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader
|
// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader
|
||||||
// is tracked by the pool and automatically closed once the idle timeout expires.
|
// is tracked by the pool and automatically closed once the idle timeout expires.
|
||||||
func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) {
|
func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) {
|
||||||
var reader Reader
|
if !p.lazyReaderEnabled {
|
||||||
var err error
|
return NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader)
|
||||||
|
|
||||||
if p.lazyReaderEnabled {
|
|
||||||
reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta))
|
|
||||||
} else {
|
|
||||||
reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
idBytes := id.Bytes()
|
||||||
return nil, err
|
lazyReader, err, _ := p.lazyReadersSF.Do(*(*string)(unsafe.Pointer(&idBytes)), func() (interface{}, error) {
|
||||||
}
|
return NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta))
|
||||||
|
})
|
||||||
|
|
||||||
|
reader := lazyReader.(Reader)
|
||||||
|
|
||||||
// Keep track of lazy readers only if required.
|
// Keep track of lazy readers only if required.
|
||||||
if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 {
|
if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 {
|
||||||
|
|
|
@ -6,12 +6,16 @@ package indexheader
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-kit/log"
|
"github.com/go-kit/log"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
|
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||||
"github.com/prometheus/prometheus/model/labels"
|
"github.com/prometheus/prometheus/model/labels"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/thanos-io/objstore"
|
||||||
"github.com/thanos-io/objstore/providers/filesystem"
|
"github.com/thanos-io/objstore/providers/filesystem"
|
||||||
|
|
||||||
"github.com/efficientgo/core/testutil"
|
"github.com/efficientgo/core/testutil"
|
||||||
|
@ -132,3 +136,60 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) {
|
||||||
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
|
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
|
||||||
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))
|
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReaderPool_MultipleReaders(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
blkDir := t.TempDir()
|
||||||
|
|
||||||
|
bkt := objstore.NewInMemBucket()
|
||||||
|
b1, err := e2eutil.CreateBlock(ctx, blkDir, []labels.Labels{
|
||||||
|
labels.New(labels.Label{Name: "a", Value: "1"}),
|
||||||
|
labels.New(labels.Label{Name: "a", Value: "2"}),
|
||||||
|
labels.New(labels.Label{Name: "a", Value: "3"}),
|
||||||
|
labels.New(labels.Label{Name: "a", Value: "4"}),
|
||||||
|
labels.New(labels.Label{Name: "b", Value: "1"}),
|
||||||
|
}, 100, 0, 1000, labels.New(labels.Label{Name: "ext1", Value: "val1"}), 124, metadata.NoneFunc, nil)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(blkDir, b1.String()), metadata.NoneFunc))
|
||||||
|
|
||||||
|
readerPool := NewReaderPool(
|
||||||
|
log.NewNopLogger(),
|
||||||
|
true,
|
||||||
|
time.Minute,
|
||||||
|
NewReaderPoolMetrics(prometheus.NewRegistry()),
|
||||||
|
AlwaysEagerDownloadIndexHeader,
|
||||||
|
)
|
||||||
|
|
||||||
|
dlDir := t.TempDir()
|
||||||
|
|
||||||
|
m, err := metadata.ReadFromDir(filepath.Join(blkDir, b1.String()))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
startWg := &sync.WaitGroup{}
|
||||||
|
startWg.Add(1)
|
||||||
|
|
||||||
|
waitWg := &sync.WaitGroup{}
|
||||||
|
|
||||||
|
const readersCount = 10
|
||||||
|
waitWg.Add(readersCount)
|
||||||
|
for i := 0; i < readersCount; i++ {
|
||||||
|
go func() {
|
||||||
|
defer waitWg.Done()
|
||||||
|
t.Logf("waiting")
|
||||||
|
startWg.Wait()
|
||||||
|
t.Logf("starting")
|
||||||
|
|
||||||
|
br, err := readerPool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, dlDir, b1, 32, m)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
testutil.Ok(t, br.Close())
|
||||||
|
})
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
startWg.Done()
|
||||||
|
waitWg.Wait()
|
||||||
|
}
|
||||||
|
|
|
@ -814,9 +814,10 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
|
|
||||||
req *storepb.SeriesRequest
|
req *storepb.SeriesRequest
|
||||||
|
|
||||||
expectedSeries []rawSeries
|
expectedSeries []rawSeries
|
||||||
expectedErr error
|
expectedErr error
|
||||||
expectedWarningsLen int
|
expectedWarningsLen int
|
||||||
|
expectTimeoutBehavior bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
title: "partial response disabled; 1st errors out after some delay; 2nd store is fast",
|
title: "partial response disabled; 1st errors out after some delay; 2nd store is fast",
|
||||||
|
@ -1210,7 +1211,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
expectedErr: errors.New("rpc error: code = Aborted desc = warning"),
|
expectedErr: errors.New("rpc error: code = Aborted desc = warning"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
title: "partial response disabled; all stores respond 3s",
|
title: "partial response disabled; all stores respond with timeout",
|
||||||
storeAPIs: []Client{
|
storeAPIs: []Client{
|
||||||
&storetestutil.TestClient{
|
&storetestutil.TestClient{
|
||||||
StoreClient: &mockedStoreAPI{
|
StoreClient: &mockedStoreAPI{
|
||||||
|
@ -1219,7 +1220,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
||||||
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
||||||
},
|
},
|
||||||
RespDuration: 3 * time.Second,
|
RespDuration: 2 * time.Second,
|
||||||
},
|
},
|
||||||
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
||||||
MinTime: 1,
|
MinTime: 1,
|
||||||
|
@ -1239,10 +1240,11 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}},
|
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedErr: errors.New("rpc error: code = Aborted desc = receive series from : context deadline exceeded"),
|
expectedErr: errors.New("rpc error: code = Aborted desc = failed to receive any data in 1.5s from : context canceled"),
|
||||||
|
expectTimeoutBehavior: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
title: "partial response enabled; all stores respond 3s",
|
title: "partial response enabled; all stores respond with manageable timing",
|
||||||
storeAPIs: []Client{
|
storeAPIs: []Client{
|
||||||
&storetestutil.TestClient{
|
&storetestutil.TestClient{
|
||||||
StoreClient: &mockedStoreAPI{
|
StoreClient: &mockedStoreAPI{
|
||||||
|
@ -1251,7 +1253,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
||||||
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
||||||
},
|
},
|
||||||
RespDuration: 3 * time.Second,
|
RespDuration: 1 * time.Second,
|
||||||
},
|
},
|
||||||
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
||||||
MinTime: 1,
|
MinTime: 1,
|
||||||
|
@ -1264,7 +1266,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
||||||
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
||||||
},
|
},
|
||||||
RespDuration: 3 * time.Second,
|
RespDuration: 1 * time.Second,
|
||||||
},
|
},
|
||||||
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
||||||
MinTime: 1,
|
MinTime: 1,
|
||||||
|
@ -1281,12 +1283,16 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
lset: labels.FromStrings("a", "b"),
|
lset: labels.FromStrings("a", "b"),
|
||||||
chunks: [][]sample{
|
chunks: [][]sample{
|
||||||
{{1, 1}, {2, 2}, {3, 3}},
|
{{1, 1}, {2, 2}, {3, 3}},
|
||||||
|
{{4, 1}, {5, 2}, {6, 3}},
|
||||||
|
{{7, 1}, {8, 2}, {9, 3}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
lset: labels.FromStrings("b", "c"),
|
lset: labels.FromStrings("b", "c"),
|
||||||
chunks: [][]sample{
|
chunks: [][]sample{
|
||||||
{{1, 1}, {2, 2}, {3, 3}},
|
{{1, 1}, {2, 2}, {3, 3}},
|
||||||
|
{{4, 1}, {5, 2}, {6, 3}},
|
||||||
|
{{7, 1}, {8, 2}, {9, 3}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -1300,22 +1306,31 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
if ok := t.Run(tc.title, func(t *testing.T) {
|
if ok := t.Run(tc.title, func(t *testing.T) {
|
||||||
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
|
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
|
||||||
if ok := t.Run(string(strategy), func(t *testing.T) {
|
if ok := t.Run(string(strategy), func(t *testing.T) {
|
||||||
|
// Use more reasonable timeouts
|
||||||
|
proxyTimeout := 3 * time.Second
|
||||||
|
contextTimeout := 4 * time.Second
|
||||||
|
|
||||||
|
if tc.expectTimeoutBehavior {
|
||||||
|
// For timeout tests, use shorter timeouts
|
||||||
|
proxyTimeout = 1500 * time.Millisecond
|
||||||
|
contextTimeout = 2 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
q := NewProxyStore(nil,
|
q := NewProxyStore(nil,
|
||||||
nil,
|
nil,
|
||||||
func() []Client { return tc.storeAPIs },
|
func() []Client { return tc.storeAPIs },
|
||||||
component.Query,
|
component.Query,
|
||||||
tc.selectorLabels,
|
tc.selectorLabels,
|
||||||
4*time.Second, strategy,
|
proxyTimeout, strategy,
|
||||||
options...,
|
options...,
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
s := newStoreSeriesServer(ctx)
|
s := newStoreSeriesServer(ctx)
|
||||||
|
|
||||||
t0 := time.Now()
|
|
||||||
err := q.Series(tc.req, s)
|
err := q.Series(tc.req, s)
|
||||||
elapsedTime := time.Since(t0)
|
|
||||||
if tc.expectedErr != nil {
|
if tc.expectedErr != nil {
|
||||||
testutil.NotOk(t, err)
|
testutil.NotOk(t, err)
|
||||||
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
|
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
|
||||||
|
@ -1327,7 +1342,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
seriesEquals(t, tc.expectedSeries, s.SeriesSet)
|
seriesEquals(t, tc.expectedSeries, s.SeriesSet)
|
||||||
testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings)
|
testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings)
|
||||||
|
|
||||||
testutil.Assert(t, elapsedTime < 5010*time.Millisecond, fmt.Sprintf("Request has taken %f, expected: <%d, it seems that responseTimeout doesn't work properly.", elapsedTime.Seconds(), 5))
|
// Note: We avoid timing assertions as they are flaky in CI environments
|
||||||
|
|
||||||
}); !ok {
|
}); !ok {
|
||||||
return
|
return
|
||||||
|
@ -1340,7 +1355,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
||||||
|
|
||||||
// Wait until the last goroutine exits which is stuck on time.Sleep().
|
// Wait until the last goroutine exits which is stuck on time.Sleep().
|
||||||
// Otherwise, goleak complains.
|
// Otherwise, goleak complains.
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
|
func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
|
||||||
|
|
|
@ -777,6 +777,7 @@ type RulerBuilder struct {
|
||||||
evalInterval string
|
evalInterval string
|
||||||
forGracePeriod string
|
forGracePeriod string
|
||||||
restoreIgnoredLabels []string
|
restoreIgnoredLabels []string
|
||||||
|
nativeHistograms bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRulerBuilder is a Ruler future that allows extra configuration before initialization.
|
// NewRulerBuilder is a Ruler future that allows extra configuration before initialization.
|
||||||
|
@ -827,6 +828,11 @@ func (r *RulerBuilder) WithRestoreIgnoredLabels(labels ...string) *RulerBuilder
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RulerBuilder) WithNativeHistograms() *RulerBuilder {
|
||||||
|
r.nativeHistograms = true
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
func (r *RulerBuilder) InitTSDB(internalRuleDir string, queryCfg []clientconfig.Config) *e2eobs.Observable {
|
func (r *RulerBuilder) InitTSDB(internalRuleDir string, queryCfg []clientconfig.Config) *e2eobs.Observable {
|
||||||
return r.initRule(internalRuleDir, queryCfg, nil)
|
return r.initRule(internalRuleDir, queryCfg, nil)
|
||||||
}
|
}
|
||||||
|
@ -894,6 +900,10 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []clientconfig.
|
||||||
ruleArgs["--remote-write.config"] = string(rwCfgBytes)
|
ruleArgs["--remote-write.config"] = string(rwCfgBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if r.nativeHistograms {
|
||||||
|
ruleArgs["--tsdb.enable-native-histograms"] = ""
|
||||||
|
}
|
||||||
|
|
||||||
args := e2e.BuildArgs(ruleArgs)
|
args := e2e.BuildArgs(ruleArgs)
|
||||||
|
|
||||||
for _, label := range r.restoreIgnoredLabels {
|
for _, label := range r.restoreIgnoredLabels {
|
||||||
|
|
|
@ -394,6 +394,64 @@ func TestRuleNativeHistograms(t *testing.T) {
|
||||||
queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return expectedRecordedName }, time.Now, promclient.QueryOptions{Deduplicate: true}, expectedHistogramModelVector(expectedRecordedName, nil, expectedRecordedHistogram, map[string]string{"tenant_id": "default-tenant"}))
|
queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return expectedRecordedName }, time.Now, promclient.QueryOptions{Deduplicate: true}, expectedHistogramModelVector(expectedRecordedName, nil, expectedRecordedHistogram, map[string]string{"tenant_id": "default-tenant"}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRuleNativeHistogramsTSDB(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
e, err := e2e.NewDockerEnvironment("hist-rule-tsdb")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
t.Cleanup(e2ethanos.CleanScenario(t, e))
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
rFuture := e2ethanos.NewRulerBuilder(e, "1").WithNativeHistograms()
|
||||||
|
rulesSubDir := "rules"
|
||||||
|
rulesPath := filepath.Join(rFuture.Dir(), rulesSubDir)
|
||||||
|
testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm))
|
||||||
|
|
||||||
|
for i, rule := range []string{testRuleRecordHistogramSum} {
|
||||||
|
createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-%d.yaml", i)), rule)
|
||||||
|
}
|
||||||
|
|
||||||
|
receiver := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().WithNativeHistograms().Init()
|
||||||
|
testutil.Ok(t, e2e.StartAndWaitReady(receiver))
|
||||||
|
|
||||||
|
receiver2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled().WithNativeHistograms().Init()
|
||||||
|
testutil.Ok(t, e2e.StartAndWaitReady(receiver2))
|
||||||
|
|
||||||
|
q := e2ethanos.NewQuerierBuilder(e, "1", receiver.InternalEndpoint("grpc"), receiver2.InternalEndpoint("grpc")).WithReplicaLabels("receive", "replica").Init()
|
||||||
|
testutil.Ok(t, e2e.StartAndWaitReady(q))
|
||||||
|
|
||||||
|
histograms := tsdbutil.GenerateTestHistograms(4)
|
||||||
|
ts := time.Now().Add(-2 * time.Minute)
|
||||||
|
rawRemoteWriteURL1 := "http://" + receiver.Endpoint("remote-write") + "/api/v1/receive"
|
||||||
|
_, err = writeHistograms(ctx, ts, testHistogramMetricName, histograms, nil, rawRemoteWriteURL1, prompb.Label{Name: "series", Value: "one"})
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
rawRemoteWriteURL2 := "http://" + receiver2.Endpoint("remote-write") + "/api/v1/receive"
|
||||||
|
_, err = writeHistograms(ctx, ts, testHistogramMetricName, histograms, nil, rawRemoteWriteURL2, prompb.Label{Name: "series", Value: "two"})
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
r := rFuture.InitTSDB(filepath.Join(rFuture.InternalDir(), rulesSubDir), []clientconfig.Config{
|
||||||
|
{
|
||||||
|
GRPCConfig: &clientconfig.GRPCConfig{
|
||||||
|
EndpointAddrs: []string{q.InternalEndpoint("grpc")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
testutil.Ok(t, e2e.StartAndWaitReady(r))
|
||||||
|
|
||||||
|
qR := e2ethanos.NewQuerierBuilder(e, "2", r.InternalEndpoint("grpc")).Init()
|
||||||
|
testutil.Ok(t, e2e.StartAndWaitReady(qR))
|
||||||
|
|
||||||
|
// Wait until samples are written successfully.
|
||||||
|
histogramMatcher, _ := matchers.NewMatcher(matchers.MatchEqual, "type", "histogram")
|
||||||
|
testutil.Ok(t, r.WaitSumMetricsWithOptions(e2emon.GreaterOrEqual(1), []string{"prometheus_tsdb_head_samples_appended_total"}, e2emon.WithLabelMatchers(histogramMatcher), e2emon.WaitMissingMetrics()))
|
||||||
|
|
||||||
|
expectedRecordedName := testHistogramMetricName + ":sum"
|
||||||
|
expectedRecordedHistogram := histograms[len(histograms)-1].ToFloat(nil).Mul(2)
|
||||||
|
queryAndAssert(t, ctx, qR.Endpoint("http"), func() string { return expectedRecordedName }, time.Now, promclient.QueryOptions{Deduplicate: true}, expectedHistogramModelVector(expectedRecordedName, nil, expectedRecordedHistogram, nil))
|
||||||
|
}
|
||||||
|
|
||||||
func writeHistograms(ctx context.Context, now time.Time, name string, histograms []*histogram.Histogram, floatHistograms []*histogram.FloatHistogram, rawRemoteWriteURL string, labels ...prompb.Label) (time.Time, error) {
|
func writeHistograms(ctx context.Context, now time.Time, name string, histograms []*histogram.Histogram, floatHistograms []*histogram.FloatHistogram, rawRemoteWriteURL string, labels ...prompb.Label) (time.Time, error) {
|
||||||
startTime := now.Add(time.Duration(len(histograms)-1) * -30 * time.Second).Truncate(30 * time.Second)
|
startTime := now.Add(time.Duration(len(histograms)-1) * -30 * time.Second).Truncate(30 * time.Second)
|
||||||
prompbHistograms := make([]prompb.Histogram, 0, len(histograms))
|
prompbHistograms := make([]prompb.Histogram, 0, len(histograms))
|
||||||
|
|
|
@ -34,6 +34,7 @@ import (
|
||||||
|
|
||||||
"github.com/efficientgo/core/testutil"
|
"github.com/efficientgo/core/testutil"
|
||||||
|
|
||||||
|
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
|
||||||
"github.com/thanos-io/thanos/pkg/promclient"
|
"github.com/thanos-io/thanos/pkg/promclient"
|
||||||
"github.com/thanos-io/thanos/pkg/receive"
|
"github.com/thanos-io/thanos/pkg/receive"
|
||||||
"github.com/thanos-io/thanos/pkg/runutil"
|
"github.com/thanos-io/thanos/pkg/runutil"
|
||||||
|
@ -1210,7 +1211,7 @@ func TestReceiveCpnp(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
t.Cleanup(e2ethanos.CleanScenario(t, e))
|
t.Cleanup(e2ethanos.CleanScenario(t, e))
|
||||||
|
|
||||||
i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
|
i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().WithExemplarsInMemStorage(100).Init()
|
||||||
testutil.Ok(t, e2e.StartAndWaitReady(i))
|
testutil.Ok(t, e2e.StartAndWaitReady(i))
|
||||||
|
|
||||||
h := receive.HashringConfig{
|
h := receive.HashringConfig{
|
||||||
|
@ -1232,6 +1233,15 @@ func TestReceiveCpnp(t *testing.T) {
|
||||||
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
|
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
|
||||||
Timeseries: []prompb.TimeSeries{
|
Timeseries: []prompb.TimeSeries{
|
||||||
{
|
{
|
||||||
|
Exemplars: []prompb.Exemplar{
|
||||||
|
{
|
||||||
|
Labels: []prompb.Label{
|
||||||
|
{Name: "receive", Value: "receive-ingestor"},
|
||||||
|
},
|
||||||
|
Value: 1.2345,
|
||||||
|
Timestamp: timestamp.FromTime(ts),
|
||||||
|
},
|
||||||
|
},
|
||||||
Labels: []prompb.Label{
|
Labels: []prompb.Label{
|
||||||
{Name: model.MetricNameLabel, Value: "myself"},
|
{Name: model.MetricNameLabel, Value: "myself"},
|
||||||
},
|
},
|
||||||
|
@ -1265,4 +1275,12 @@ func TestReceiveCpnp(t *testing.T) {
|
||||||
},
|
},
|
||||||
}, v)
|
}, v)
|
||||||
|
|
||||||
|
// TODO(GiedriusS): repro for https://github.com/thanos-io/thanos/issues/8224. Fix in following PRs.
|
||||||
|
queryExemplars(
|
||||||
|
t, context.Background(), q.Endpoint("http"), "myself", timestamp.FromTime(ts), timestamp.FromTime(ts), func(data []*exemplarspb.ExemplarData) error {
|
||||||
|
require.Equal(t, "\000\000\000\000\000\000\000", data[0].Exemplars[0].Labels.Labels[0].Name)
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,8 @@ groups:
|
||||||
- name: example_abort
|
- name: example_abort
|
||||||
interval: 1s
|
interval: 1s
|
||||||
# Abort should be a default: partial_response_strategy: "ABORT"
|
# Abort should be a default: partial_response_strategy: "ABORT"
|
||||||
|
labels:
|
||||||
|
foo: bar
|
||||||
rules:
|
rules:
|
||||||
- alert: TestAlert_AbortOnPartialResponse
|
- alert: TestAlert_AbortOnPartialResponse
|
||||||
# It must be based on actual metrics otherwise call to StoreAPI would be not involved.
|
# It must be based on actual metrics otherwise call to StoreAPI would be not involved.
|
||||||
|
@ -501,6 +503,7 @@ func TestRule(t *testing.T) {
|
||||||
"__name__": "ALERTS",
|
"__name__": "ALERTS",
|
||||||
"severity": "page",
|
"severity": "page",
|
||||||
"alertname": "TestAlert_AbortOnPartialResponse",
|
"alertname": "TestAlert_AbortOnPartialResponse",
|
||||||
|
"foo": "bar",
|
||||||
"alertstate": "firing",
|
"alertstate": "firing",
|
||||||
"replica": "1",
|
"replica": "1",
|
||||||
},
|
},
|
||||||
|
@ -521,11 +524,6 @@ func TestRule(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
expAlertLabels := []model.LabelSet{
|
expAlertLabels := []model.LabelSet{
|
||||||
{
|
|
||||||
"severity": "page",
|
|
||||||
"alertname": "TestAlert_AbortOnPartialResponse",
|
|
||||||
"replica": "1",
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"severity": "page",
|
"severity": "page",
|
||||||
"alertname": "TestAlert_HasBeenLoadedViaWebHandler",
|
"alertname": "TestAlert_HasBeenLoadedViaWebHandler",
|
||||||
|
@ -536,6 +534,12 @@ func TestRule(t *testing.T) {
|
||||||
"alertname": "TestAlert_WarnOnPartialResponse",
|
"alertname": "TestAlert_WarnOnPartialResponse",
|
||||||
"replica": "1",
|
"replica": "1",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"severity": "page",
|
||||||
|
"foo": "bar",
|
||||||
|
"alertname": "TestAlert_AbortOnPartialResponse",
|
||||||
|
"replica": "1",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
alrts, err := promclient.NewDefaultClient().AlertmanagerAlerts(ctx, urlParse(t, "http://"+am2.Endpoint("http")))
|
alrts, err := promclient.NewDefaultClient().AlertmanagerAlerts(ctx, urlParse(t, "http://"+am2.Endpoint("http")))
|
||||||
|
|
|
@ -102,6 +102,7 @@ func TestRulesAPI_Fanout(t *testing.T) {
|
||||||
State: rulespb.AlertState_FIRING,
|
State: rulespb.AlertState_FIRING,
|
||||||
Query: "absent(some_metric)",
|
Query: "absent(some_metric)",
|
||||||
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
|
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
|
||||||
|
{Name: "foo", Value: "bar"},
|
||||||
{Name: "prometheus", Value: "ha"},
|
{Name: "prometheus", Value: "ha"},
|
||||||
{Name: "severity", Value: "page"},
|
{Name: "severity", Value: "page"},
|
||||||
}},
|
}},
|
||||||
|
@ -118,6 +119,7 @@ func TestRulesAPI_Fanout(t *testing.T) {
|
||||||
State: rulespb.AlertState_FIRING,
|
State: rulespb.AlertState_FIRING,
|
||||||
Query: "absent(some_metric)",
|
Query: "absent(some_metric)",
|
||||||
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
|
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
|
||||||
|
{Name: "foo", Value: "bar"},
|
||||||
{Name: "severity", Value: "page"},
|
{Name: "severity", Value: "page"},
|
||||||
}},
|
}},
|
||||||
Health: string(rules.HealthGood),
|
Health: string(rules.HealthGood),
|
||||||
|
|
Loading…
Reference in New Issue