mirror of https://github.com/thanos-io/thanos.git
Compare commits
13 Commits
v0.39.0-rc
...
main
Author | SHA1 | Date |
---|---|---|
|
3727363b49 | |
|
37254e5779 | |
|
4b31bbaa6b | |
|
d6ee898a06 | |
|
5a95d13802 | |
|
b54d293dbd | |
|
dfcbfe7c40 | |
|
8b738c55b1 | |
|
69624ecbf1 | |
|
0453c9b144 | |
|
9c955d21df | |
|
7de9c13e5f | |
|
34a98c8efb |
13
CHANGELOG.md
13
CHANGELOG.md
|
@ -8,7 +8,17 @@ NOTE: As semantic versioning states all 0.y.z releases can contain breaking chan
|
|||
|
||||
We use *breaking :warning:* to mark changes that are not backward compatible (relates only to v0.y.z releases.)
|
||||
|
||||
### [v0.39.0-rc.0](https://github.com/thanos-io/thanos/tree/release-0.39) - 2025 06 19
|
||||
## Unreleased
|
||||
|
||||
### Added
|
||||
|
||||
### Changed
|
||||
|
||||
### Removed
|
||||
|
||||
### Fixed
|
||||
|
||||
## [v0.39.0](https://github.com/thanos-io/thanos/tree/release-0.39) - 2025 06 25
|
||||
|
||||
In short: there are a bunch of fixes and small improvements. The shining items in this release are memory usage improvements in Thanos Query and shuffle sharding support in Thanos Receiver. Information about shuffle sharding support is available in the documentation. Thank you to all contributors!
|
||||
|
||||
|
@ -33,7 +43,6 @@ In short: there are a bunch of fixes and small improvements. The shining items i
|
|||
|
||||
### Fixed
|
||||
- [#8199](https://github.com/thanos-io/thanos/pull/8199) Query: handle panics or nil pointer dereference in querier gracefully when query analyze returns nil
|
||||
|
||||
- [#8211](https://github.com/thanos-io/thanos/pull/8211) Query: fix panic on nested partial response in distributed instant query
|
||||
- [#8216](https://github.com/thanos-io/thanos/pull/8216) Query/Receive: fix iter race between `next()` and `stop()` introduced in https://github.com/thanos-io/thanos/pull/7821.
|
||||
- [#8212](https://github.com/thanos-io/thanos/pull/8212) Receive: Ensure forward/replication metrics are incremented in err cases
|
||||
|
|
|
@ -114,6 +114,7 @@ type ruleConfig struct {
|
|||
|
||||
extendedFunctionsEnabled bool
|
||||
EnableFeatures []string
|
||||
tsdbEnableNativeHistograms bool
|
||||
}
|
||||
|
||||
type Expression struct {
|
||||
|
@ -170,6 +171,10 @@ func registerRule(app *extkingpin.App) {
|
|||
cmd.Flag("query.enable-x-functions", "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine.").Default("false").BoolVar(&conf.extendedFunctionsEnabled)
|
||||
cmd.Flag("enable-feature", "Comma separated feature names to enable. Valid options for now: promql-experimental-functions (enables promql experimental functions for ruler)").Default("").StringsVar(&conf.EnableFeatures)
|
||||
|
||||
cmd.Flag("tsdb.enable-native-histograms",
|
||||
"[EXPERIMENTAL] Enables the ingestion of native histograms.").
|
||||
Default("false").BoolVar(&conf.tsdbEnableNativeHistograms)
|
||||
|
||||
conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write configurations, that specify servers where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution())
|
||||
|
||||
conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)
|
||||
|
@ -194,6 +199,7 @@ func registerRule(app *extkingpin.App) {
|
|||
RetentionDuration: int64(time.Duration(*tsdbRetention) / time.Millisecond),
|
||||
NoLockfile: *noLockFile,
|
||||
WALCompression: wlog.ParseCompressionType(*walCompression, string(wlog.CompressionSnappy)),
|
||||
EnableNativeHistograms: conf.tsdbEnableNativeHistograms,
|
||||
}
|
||||
|
||||
agentOpts := &agent.Options{
|
||||
|
|
|
@ -503,6 +503,9 @@ Flags:
|
|||
options for now: promql-experimental-functions
|
||||
(enables promql experimental functions for
|
||||
ruler)
|
||||
--[no-]tsdb.enable-native-histograms
|
||||
[EXPERIMENTAL] Enables the ingestion of native
|
||||
histograms.
|
||||
--remote-write.config-file=<file-path>
|
||||
Path to YAML config for the remote-write
|
||||
configurations, that specify servers
|
||||
|
|
|
@ -7,11 +7,14 @@ import (
|
|||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/go-kit/log/level"
|
||||
"github.com/oklog/ulid/v2"
|
||||
|
||||
xsync "golang.org/x/sync/singleflight"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/thanos-io/objstore"
|
||||
|
@ -49,6 +52,7 @@ type ReaderPool struct {
|
|||
// Keep track of all readers managed by the pool.
|
||||
lazyReadersMx sync.Mutex
|
||||
lazyReaders map[*LazyBinaryReader]struct{}
|
||||
lazyReadersSF xsync.Group
|
||||
|
||||
lazyDownloadFunc LazyDownloadIndexHeaderFunc
|
||||
}
|
||||
|
@ -123,18 +127,16 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime
|
|||
// with lazy reader enabled, this function will return a lazy reader. The returned lazy reader
|
||||
// is tracked by the pool and automatically closed once the idle timeout expires.
|
||||
func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) {
|
||||
var reader Reader
|
||||
var err error
|
||||
|
||||
if p.lazyReaderEnabled {
|
||||
reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta))
|
||||
} else {
|
||||
reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader)
|
||||
if !p.lazyReaderEnabled {
|
||||
return NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
idBytes := id.Bytes()
|
||||
lazyReader, err, _ := p.lazyReadersSF.Do(*(*string)(unsafe.Pointer(&idBytes)), func() (interface{}, error) {
|
||||
return NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta))
|
||||
})
|
||||
|
||||
reader := lazyReader.(Reader)
|
||||
|
||||
// Keep track of lazy readers only if required.
|
||||
if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 {
|
||||
|
|
|
@ -6,12 +6,16 @@ package indexheader
|
|||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/log"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
promtestutil "github.com/prometheus/client_golang/prometheus/testutil"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/thanos-io/objstore"
|
||||
"github.com/thanos-io/objstore/providers/filesystem"
|
||||
|
||||
"github.com/efficientgo/core/testutil"
|
||||
|
@ -132,3 +136,60 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) {
|
|||
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount))
|
||||
testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount))
|
||||
}
|
||||
|
||||
func TestReaderPool_MultipleReaders(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
blkDir := t.TempDir()
|
||||
|
||||
bkt := objstore.NewInMemBucket()
|
||||
b1, err := e2eutil.CreateBlock(ctx, blkDir, []labels.Labels{
|
||||
labels.New(labels.Label{Name: "a", Value: "1"}),
|
||||
labels.New(labels.Label{Name: "a", Value: "2"}),
|
||||
labels.New(labels.Label{Name: "a", Value: "3"}),
|
||||
labels.New(labels.Label{Name: "a", Value: "4"}),
|
||||
labels.New(labels.Label{Name: "b", Value: "1"}),
|
||||
}, 100, 0, 1000, labels.New(labels.Label{Name: "ext1", Value: "val1"}), 124, metadata.NoneFunc, nil)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(blkDir, b1.String()), metadata.NoneFunc))
|
||||
|
||||
readerPool := NewReaderPool(
|
||||
log.NewNopLogger(),
|
||||
true,
|
||||
time.Minute,
|
||||
NewReaderPoolMetrics(prometheus.NewRegistry()),
|
||||
AlwaysEagerDownloadIndexHeader,
|
||||
)
|
||||
|
||||
dlDir := t.TempDir()
|
||||
|
||||
m, err := metadata.ReadFromDir(filepath.Join(blkDir, b1.String()))
|
||||
testutil.Ok(t, err)
|
||||
|
||||
startWg := &sync.WaitGroup{}
|
||||
startWg.Add(1)
|
||||
|
||||
waitWg := &sync.WaitGroup{}
|
||||
|
||||
const readersCount = 10
|
||||
waitWg.Add(readersCount)
|
||||
for i := 0; i < readersCount; i++ {
|
||||
go func() {
|
||||
defer waitWg.Done()
|
||||
t.Logf("waiting")
|
||||
startWg.Wait()
|
||||
t.Logf("starting")
|
||||
|
||||
br, err := readerPool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, dlDir, b1, 32, m)
|
||||
testutil.Ok(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
testutil.Ok(t, br.Close())
|
||||
})
|
||||
}()
|
||||
}
|
||||
|
||||
startWg.Done()
|
||||
waitWg.Wait()
|
||||
}
|
||||
|
|
|
@ -817,6 +817,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
expectedSeries []rawSeries
|
||||
expectedErr error
|
||||
expectedWarningsLen int
|
||||
expectTimeoutBehavior bool
|
||||
}{
|
||||
{
|
||||
title: "partial response disabled; 1st errors out after some delay; 2nd store is fast",
|
||||
|
@ -1210,7 +1211,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
expectedErr: errors.New("rpc error: code = Aborted desc = warning"),
|
||||
},
|
||||
{
|
||||
title: "partial response disabled; all stores respond 3s",
|
||||
title: "partial response disabled; all stores respond with timeout",
|
||||
storeAPIs: []Client{
|
||||
&storetestutil.TestClient{
|
||||
StoreClient: &mockedStoreAPI{
|
||||
|
@ -1219,7 +1220,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
||||
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
||||
},
|
||||
RespDuration: 3 * time.Second,
|
||||
RespDuration: 2 * time.Second,
|
||||
},
|
||||
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
||||
MinTime: 1,
|
||||
|
@ -1239,10 +1240,11 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}},
|
||||
},
|
||||
},
|
||||
expectedErr: errors.New("rpc error: code = Aborted desc = receive series from : context deadline exceeded"),
|
||||
expectedErr: errors.New("rpc error: code = Aborted desc = failed to receive any data in 1.5s from : context canceled"),
|
||||
expectTimeoutBehavior: true,
|
||||
},
|
||||
{
|
||||
title: "partial response enabled; all stores respond 3s",
|
||||
title: "partial response enabled; all stores respond with manageable timing",
|
||||
storeAPIs: []Client{
|
||||
&storetestutil.TestClient{
|
||||
StoreClient: &mockedStoreAPI{
|
||||
|
@ -1251,7 +1253,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
||||
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
||||
},
|
||||
RespDuration: 3 * time.Second,
|
||||
RespDuration: 1 * time.Second,
|
||||
},
|
||||
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
||||
MinTime: 1,
|
||||
|
@ -1264,7 +1266,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{4, 1}, {5, 2}, {6, 3}}),
|
||||
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{7, 1}, {8, 2}, {9, 3}}),
|
||||
},
|
||||
RespDuration: 3 * time.Second,
|
||||
RespDuration: 1 * time.Second,
|
||||
},
|
||||
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
|
||||
MinTime: 1,
|
||||
|
@ -1281,12 +1283,16 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
lset: labels.FromStrings("a", "b"),
|
||||
chunks: [][]sample{
|
||||
{{1, 1}, {2, 2}, {3, 3}},
|
||||
{{4, 1}, {5, 2}, {6, 3}},
|
||||
{{7, 1}, {8, 2}, {9, 3}},
|
||||
},
|
||||
},
|
||||
{
|
||||
lset: labels.FromStrings("b", "c"),
|
||||
chunks: [][]sample{
|
||||
{{1, 1}, {2, 2}, {3, 3}},
|
||||
{{4, 1}, {5, 2}, {6, 3}},
|
||||
{{7, 1}, {8, 2}, {9, 3}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1300,22 +1306,31 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
if ok := t.Run(tc.title, func(t *testing.T) {
|
||||
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
|
||||
if ok := t.Run(string(strategy), func(t *testing.T) {
|
||||
// Use more reasonable timeouts
|
||||
proxyTimeout := 3 * time.Second
|
||||
contextTimeout := 4 * time.Second
|
||||
|
||||
if tc.expectTimeoutBehavior {
|
||||
// For timeout tests, use shorter timeouts
|
||||
proxyTimeout = 1500 * time.Millisecond
|
||||
contextTimeout = 2 * time.Second
|
||||
}
|
||||
|
||||
q := NewProxyStore(nil,
|
||||
nil,
|
||||
func() []Client { return tc.storeAPIs },
|
||||
component.Query,
|
||||
tc.selectorLabels,
|
||||
4*time.Second, strategy,
|
||||
proxyTimeout, strategy,
|
||||
options...,
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
|
||||
defer cancel()
|
||||
s := newStoreSeriesServer(ctx)
|
||||
|
||||
t0 := time.Now()
|
||||
err := q.Series(tc.req, s)
|
||||
elapsedTime := time.Since(t0)
|
||||
|
||||
if tc.expectedErr != nil {
|
||||
testutil.NotOk(t, err)
|
||||
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
|
||||
|
@ -1327,7 +1342,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
seriesEquals(t, tc.expectedSeries, s.SeriesSet)
|
||||
testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings)
|
||||
|
||||
testutil.Assert(t, elapsedTime < 5010*time.Millisecond, fmt.Sprintf("Request has taken %f, expected: <%d, it seems that responseTimeout doesn't work properly.", elapsedTime.Seconds(), 5))
|
||||
// Note: We avoid timing assertions as they are flaky in CI environments
|
||||
|
||||
}); !ok {
|
||||
return
|
||||
|
@ -1340,7 +1355,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) {
|
|||
|
||||
// Wait until the last goroutine exits which is stuck on time.Sleep().
|
||||
// Otherwise, goleak complains.
|
||||
time.Sleep(5 * time.Second)
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
|
||||
|
|
|
@ -777,6 +777,7 @@ type RulerBuilder struct {
|
|||
evalInterval string
|
||||
forGracePeriod string
|
||||
restoreIgnoredLabels []string
|
||||
nativeHistograms bool
|
||||
}
|
||||
|
||||
// NewRulerBuilder is a Ruler future that allows extra configuration before initialization.
|
||||
|
@ -827,6 +828,11 @@ func (r *RulerBuilder) WithRestoreIgnoredLabels(labels ...string) *RulerBuilder
|
|||
return r
|
||||
}
|
||||
|
||||
func (r *RulerBuilder) WithNativeHistograms() *RulerBuilder {
|
||||
r.nativeHistograms = true
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *RulerBuilder) InitTSDB(internalRuleDir string, queryCfg []clientconfig.Config) *e2eobs.Observable {
|
||||
return r.initRule(internalRuleDir, queryCfg, nil)
|
||||
}
|
||||
|
@ -894,6 +900,10 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []clientconfig.
|
|||
ruleArgs["--remote-write.config"] = string(rwCfgBytes)
|
||||
}
|
||||
|
||||
if r.nativeHistograms {
|
||||
ruleArgs["--tsdb.enable-native-histograms"] = ""
|
||||
}
|
||||
|
||||
args := e2e.BuildArgs(ruleArgs)
|
||||
|
||||
for _, label := range r.restoreIgnoredLabels {
|
||||
|
|
|
@ -394,6 +394,64 @@ func TestRuleNativeHistograms(t *testing.T) {
|
|||
queryAndAssert(t, ctx, q.Endpoint("http"), func() string { return expectedRecordedName }, time.Now, promclient.QueryOptions{Deduplicate: true}, expectedHistogramModelVector(expectedRecordedName, nil, expectedRecordedHistogram, map[string]string{"tenant_id": "default-tenant"}))
|
||||
}
|
||||
|
||||
func TestRuleNativeHistogramsTSDB(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
e, err := e2e.NewDockerEnvironment("hist-rule-tsdb")
|
||||
testutil.Ok(t, err)
|
||||
t.Cleanup(e2ethanos.CleanScenario(t, e))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
rFuture := e2ethanos.NewRulerBuilder(e, "1").WithNativeHistograms()
|
||||
rulesSubDir := "rules"
|
||||
rulesPath := filepath.Join(rFuture.Dir(), rulesSubDir)
|
||||
testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm))
|
||||
|
||||
for i, rule := range []string{testRuleRecordHistogramSum} {
|
||||
createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-%d.yaml", i)), rule)
|
||||
}
|
||||
|
||||
receiver := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().WithNativeHistograms().Init()
|
||||
testutil.Ok(t, e2e.StartAndWaitReady(receiver))
|
||||
|
||||
receiver2 := e2ethanos.NewReceiveBuilder(e, "2").WithIngestionEnabled().WithNativeHistograms().Init()
|
||||
testutil.Ok(t, e2e.StartAndWaitReady(receiver2))
|
||||
|
||||
q := e2ethanos.NewQuerierBuilder(e, "1", receiver.InternalEndpoint("grpc"), receiver2.InternalEndpoint("grpc")).WithReplicaLabels("receive", "replica").Init()
|
||||
testutil.Ok(t, e2e.StartAndWaitReady(q))
|
||||
|
||||
histograms := tsdbutil.GenerateTestHistograms(4)
|
||||
ts := time.Now().Add(-2 * time.Minute)
|
||||
rawRemoteWriteURL1 := "http://" + receiver.Endpoint("remote-write") + "/api/v1/receive"
|
||||
_, err = writeHistograms(ctx, ts, testHistogramMetricName, histograms, nil, rawRemoteWriteURL1, prompb.Label{Name: "series", Value: "one"})
|
||||
testutil.Ok(t, err)
|
||||
rawRemoteWriteURL2 := "http://" + receiver2.Endpoint("remote-write") + "/api/v1/receive"
|
||||
_, err = writeHistograms(ctx, ts, testHistogramMetricName, histograms, nil, rawRemoteWriteURL2, prompb.Label{Name: "series", Value: "two"})
|
||||
testutil.Ok(t, err)
|
||||
|
||||
r := rFuture.InitTSDB(filepath.Join(rFuture.InternalDir(), rulesSubDir), []clientconfig.Config{
|
||||
{
|
||||
GRPCConfig: &clientconfig.GRPCConfig{
|
||||
EndpointAddrs: []string{q.InternalEndpoint("grpc")},
|
||||
},
|
||||
},
|
||||
})
|
||||
testutil.Ok(t, e2e.StartAndWaitReady(r))
|
||||
|
||||
qR := e2ethanos.NewQuerierBuilder(e, "2", r.InternalEndpoint("grpc")).Init()
|
||||
testutil.Ok(t, e2e.StartAndWaitReady(qR))
|
||||
|
||||
// Wait until samples are written successfully.
|
||||
histogramMatcher, _ := matchers.NewMatcher(matchers.MatchEqual, "type", "histogram")
|
||||
testutil.Ok(t, r.WaitSumMetricsWithOptions(e2emon.GreaterOrEqual(1), []string{"prometheus_tsdb_head_samples_appended_total"}, e2emon.WithLabelMatchers(histogramMatcher), e2emon.WaitMissingMetrics()))
|
||||
|
||||
expectedRecordedName := testHistogramMetricName + ":sum"
|
||||
expectedRecordedHistogram := histograms[len(histograms)-1].ToFloat(nil).Mul(2)
|
||||
queryAndAssert(t, ctx, qR.Endpoint("http"), func() string { return expectedRecordedName }, time.Now, promclient.QueryOptions{Deduplicate: true}, expectedHistogramModelVector(expectedRecordedName, nil, expectedRecordedHistogram, nil))
|
||||
}
|
||||
|
||||
func writeHistograms(ctx context.Context, now time.Time, name string, histograms []*histogram.Histogram, floatHistograms []*histogram.FloatHistogram, rawRemoteWriteURL string, labels ...prompb.Label) (time.Time, error) {
|
||||
startTime := now.Add(time.Duration(len(histograms)-1) * -30 * time.Second).Truncate(30 * time.Second)
|
||||
prompbHistograms := make([]prompb.Histogram, 0, len(histograms))
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
|
||||
"github.com/efficientgo/core/testutil"
|
||||
|
||||
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
|
||||
"github.com/thanos-io/thanos/pkg/promclient"
|
||||
"github.com/thanos-io/thanos/pkg/receive"
|
||||
"github.com/thanos-io/thanos/pkg/runutil"
|
||||
|
@ -1210,7 +1211,7 @@ func TestReceiveCpnp(t *testing.T) {
|
|||
testutil.Ok(t, err)
|
||||
t.Cleanup(e2ethanos.CleanScenario(t, e))
|
||||
|
||||
i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
|
||||
i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().WithExemplarsInMemStorage(100).Init()
|
||||
testutil.Ok(t, e2e.StartAndWaitReady(i))
|
||||
|
||||
h := receive.HashringConfig{
|
||||
|
@ -1232,6 +1233,15 @@ func TestReceiveCpnp(t *testing.T) {
|
|||
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
|
||||
Timeseries: []prompb.TimeSeries{
|
||||
{
|
||||
Exemplars: []prompb.Exemplar{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "receive", Value: "receive-ingestor"},
|
||||
},
|
||||
Value: 1.2345,
|
||||
Timestamp: timestamp.FromTime(ts),
|
||||
},
|
||||
},
|
||||
Labels: []prompb.Label{
|
||||
{Name: model.MetricNameLabel, Value: "myself"},
|
||||
},
|
||||
|
@ -1265,4 +1275,12 @@ func TestReceiveCpnp(t *testing.T) {
|
|||
},
|
||||
}, v)
|
||||
|
||||
// TODO(GiedriusS): repro for https://github.com/thanos-io/thanos/issues/8224. Fix in following PRs.
|
||||
queryExemplars(
|
||||
t, context.Background(), q.Endpoint("http"), "myself", timestamp.FromTime(ts), timestamp.FromTime(ts), func(data []*exemplarspb.ExemplarData) error {
|
||||
require.Equal(t, "\000\000\000\000\000\000\000", data[0].Exemplars[0].Labels.Labels[0].Name)
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
}
|
||||
|
|
|
@ -40,6 +40,8 @@ groups:
|
|||
- name: example_abort
|
||||
interval: 1s
|
||||
# Abort should be a default: partial_response_strategy: "ABORT"
|
||||
labels:
|
||||
foo: bar
|
||||
rules:
|
||||
- alert: TestAlert_AbortOnPartialResponse
|
||||
# It must be based on actual metrics otherwise call to StoreAPI would be not involved.
|
||||
|
@ -501,6 +503,7 @@ func TestRule(t *testing.T) {
|
|||
"__name__": "ALERTS",
|
||||
"severity": "page",
|
||||
"alertname": "TestAlert_AbortOnPartialResponse",
|
||||
"foo": "bar",
|
||||
"alertstate": "firing",
|
||||
"replica": "1",
|
||||
},
|
||||
|
@ -521,11 +524,6 @@ func TestRule(t *testing.T) {
|
|||
})
|
||||
|
||||
expAlertLabels := []model.LabelSet{
|
||||
{
|
||||
"severity": "page",
|
||||
"alertname": "TestAlert_AbortOnPartialResponse",
|
||||
"replica": "1",
|
||||
},
|
||||
{
|
||||
"severity": "page",
|
||||
"alertname": "TestAlert_HasBeenLoadedViaWebHandler",
|
||||
|
@ -536,6 +534,12 @@ func TestRule(t *testing.T) {
|
|||
"alertname": "TestAlert_WarnOnPartialResponse",
|
||||
"replica": "1",
|
||||
},
|
||||
{
|
||||
"severity": "page",
|
||||
"foo": "bar",
|
||||
"alertname": "TestAlert_AbortOnPartialResponse",
|
||||
"replica": "1",
|
||||
},
|
||||
}
|
||||
|
||||
alrts, err := promclient.NewDefaultClient().AlertmanagerAlerts(ctx, urlParse(t, "http://"+am2.Endpoint("http")))
|
||||
|
|
|
@ -102,6 +102,7 @@ func TestRulesAPI_Fanout(t *testing.T) {
|
|||
State: rulespb.AlertState_FIRING,
|
||||
Query: "absent(some_metric)",
|
||||
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
|
||||
{Name: "foo", Value: "bar"},
|
||||
{Name: "prometheus", Value: "ha"},
|
||||
{Name: "severity", Value: "page"},
|
||||
}},
|
||||
|
@ -118,6 +119,7 @@ func TestRulesAPI_Fanout(t *testing.T) {
|
|||
State: rulespb.AlertState_FIRING,
|
||||
Query: "absent(some_metric)",
|
||||
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{
|
||||
{Name: "foo", Value: "bar"},
|
||||
{Name: "severity", Value: "page"},
|
||||
}},
|
||||
Health: string(rules.HealthGood),
|
||||
|
|
Loading…
Reference in New Issue