stats: implement metric expiry (#4597)

* stats: implement metric expiry

Signed-off-by: Kuat Yessenov <kuat@google.com>

* missed file

Signed-off-by: Kuat Yessenov <kuat@google.com>

* mitigate data race

Signed-off-by: Kuat Yessenov <kuat@google.com>

---------

Signed-off-by: Kuat Yessenov <kuat@google.com>
This commit is contained in:
Kuat 2023-04-21 14:46:54 -07:00 committed by GitHub
parent 78f763edff
commit 3f352eefb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 137 additions and 14 deletions

View File

@ -111,4 +111,12 @@ message PluginConfig {
// Proxy deployment type.
Reporter reporter = 10;
// Metric scope rotation interval. Set to 0 to disable the metric scope rotation.
// Defaults to 0.
google.protobuf.Duration rotation_interval = 11;
// Metric expiry graceful deletion interval. No-op if the metric rotation is disabled.
// Defaults to 5m. Must be >=1s.
google.protobuf.Duration graceful_deletion_interval = 12;
}

View File

@ -14,6 +14,8 @@
#include "source/extensions/filters/http/istio_stats/istio_stats.h"
#include <atomic>
#include "envoy/registry/registry.h"
#include "envoy/server/factory_context.h"
#include "envoy/singleton/manager.h"
@ -103,8 +105,7 @@ enum class Reporter {
ClientSidecar,
// Regular inbound listener on a sidecar.
ServerSidecar,
// Inbound listener on a shared proxy. The destination properties are derived
// from the endpoint metadata instead of the proxy bootstrap.
// Gateway listener for a set of destination workloads.
ServerGateway,
};
@ -457,6 +458,73 @@ struct MetricOverrides : public Logger::Loggable<Logger::Id::filter>,
absl::flat_hash_map<std::string, uint32_t> expression_ids_;
};
// Self-managed scope with active rotation. Envoy stats scope controls the
// lifetime of the individual metrics. Because the scope is attached to xDS
// resources, metrics with data derived from the requests can accumulate and
// grow indefinitely for long-living xDS resources. To limit this growth,
// this class implements a rotation mechanism, whereas a new scope is created
// periodically to replace the current scope.
//
// The replaced stats scope is deleted gracefully after a minimum of 1s delay
// for two reasons:
//
// 1. Stats flushing is asynchronous and the data may be lost if not flushed
// before the deletion (see stats_flush_interval).
//
// 2. The implementation avoids locking by releasing a raw pointer to workers.
// When the rotation happens on the main, the raw pointer may still be in-use
// by workers for a short duration.
class RotatingScope : public Logger::Loggable<Logger::Id::filter> {
public:
RotatingScope(Server::Configuration::FactoryContext& factory_context, uint64_t rotate_interval_ms,
uint64_t delete_interval_ms)
: parent_scope_(factory_context.scope()), active_scope_(parent_scope_.createScope("")),
raw_scope_(active_scope_.get()), rotate_interval_ms_(rotate_interval_ms),
delete_interval_ms_(delete_interval_ms) {
if (rotate_interval_ms_ > 0) {
ASSERT(delete_interval_ms_ < rotate_interval_ms_);
ASSERT(delete_interval_ms_ >= 1000);
Event::Dispatcher& dispatcher = factory_context.mainThreadDispatcher();
rotate_timer_ = dispatcher.createTimer([this] { onRotate(); });
delete_timer_ = dispatcher.createTimer([this] { onDelete(); });
rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_));
}
}
~RotatingScope() {
if (rotate_timer_) {
rotate_timer_->disableTimer();
rotate_timer_.reset();
}
if (delete_timer_) {
delete_timer_->disableTimer();
delete_timer_.reset();
}
}
Stats::Scope* scope() { return raw_scope_.load(); }
private:
void onRotate() {
ENVOY_LOG(info, "Rotating active Istio stats scope after {}ms.", rotate_interval_ms_);
draining_scope_ = active_scope_;
delete_timer_->enableTimer(std::chrono::milliseconds(delete_interval_ms_));
active_scope_ = parent_scope_.createScope("");
raw_scope_.store(active_scope_.get());
rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_));
}
void onDelete() {
ENVOY_LOG(info, "Deleting draining Istio stats scope after {}ms.", delete_interval_ms_);
draining_scope_.reset();
}
Stats::Scope& parent_scope_;
Stats::ScopeSharedPtr active_scope_;
std::atomic<Stats::Scope*> raw_scope_;
Stats::ScopeSharedPtr draining_scope_{nullptr};
const uint64_t rotate_interval_ms_;
const uint64_t delete_interval_ms_;
Event::TimerPtr rotate_timer_{nullptr};
Event::TimerPtr delete_timer_{nullptr};
};
struct Config : public Logger::Loggable<Logger::Id::filter> {
Config(const stats::PluginConfig& proto_config,
Server::Configuration::FactoryContext& factory_context)
@ -466,7 +534,9 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
return std::make_shared<Context>(factory_context.serverScope().symbolTable(),
factory_context.localInfo().node());
})),
scope_(factory_context.scope()),
scope_(factory_context, PROTOBUF_GET_MS_OR_DEFAULT(proto_config, rotation_interval, 0),
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, graceful_deletion_interval,
/* 5m */ 1000 * 60 * 5)),
disable_host_header_fallback_(proto_config.disable_host_header_fallback()),
report_duration_(
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, tcp_reporting_duration, /* 5s */ 5000)),
@ -493,7 +563,7 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
break;
}
if (proto_config.metrics_size() > 0 || proto_config.definitions_size() > 0) {
metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope_.symbolTable());
metric_overrides_ = std::make_unique<MetricOverrides>(context_, scope()->symbolTable());
for (const auto& definition : proto_config.definitions()) {
const auto& it = context_->all_metrics_.find(definition.name());
if (it != context_->all_metrics_.end()) {
@ -629,12 +699,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
return;
}
auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_);
Stats::Utility::counterFromStatNames(parent_.scope_,
Stats::Utility::counterFromStatNames(*parent_.scope(),
{parent_.context_->stat_namespace_, metric}, new_tags)
.add(amount);
return;
}
Stats::Utility::counterFromStatNames(parent_.scope_,
Stats::Utility::counterFromStatNames(*parent_.scope(),
{parent_.context_->stat_namespace_, metric}, tags)
.add(amount);
}
@ -647,12 +717,12 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
}
auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_);
Stats::Utility::histogramFromStatNames(
parent_.scope_, {parent_.context_->stat_namespace_, metric}, unit, new_tags)
*parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, new_tags)
.recordValue(value);
return;
}
Stats::Utility::histogramFromStatNames(
parent_.scope_, {parent_.context_->stat_namespace_, metric}, unit, tags)
*parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, tags)
.recordValue(value);
}
@ -664,17 +734,17 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
switch (metric.type_) {
case MetricOverrides::MetricType::Counter:
Stats::Utility::counterFromStatNames(
parent_.scope_, {parent_.context_->stat_namespace_, metric.name_}, tags)
*parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, tags)
.add(amount);
break;
case MetricOverrides::MetricType::Histogram:
Stats::Utility::histogramFromStatNames(
parent_.scope_, {parent_.context_->stat_namespace_, metric.name_},
*parent_.scope(), {parent_.context_->stat_namespace_, metric.name_},
Stats::Histogram::Unit::Bytes, tags)
.recordValue(amount);
break;
case MetricOverrides::MetricType::Gauge:
Stats::Utility::gaugeFromStatNames(parent_.scope_,
Stats::Utility::gaugeFromStatNames(*parent_.scope(),
{parent_.context_->stat_namespace_, metric.name_},
Stats::Gauge::ImportMode::Accumulate, tags)
.set(amount);
@ -696,15 +766,17 @@ struct Config : public Logger::Loggable<Logger::Id::filter> {
tags.push_back({context_->tag_, context_->istio_version_.empty() ? context_->unknown_
: context_->istio_version_});
Stats::Utility::gaugeFromStatNames(scope_, {context_->stat_namespace_, context_->istio_build_},
Stats::Utility::gaugeFromStatNames(*scope(),
{context_->stat_namespace_, context_->istio_build_},
Stats::Gauge::ImportMode::Accumulate, tags)
.set(1);
}
Reporter reporter() const { return reporter_; }
Stats::Scope* scope() { return scope_.scope(); }
ContextSharedPtr context_;
Stats::Scope& scope_;
RotatingScope scope_;
Reporter reporter_;
const bool disable_host_header_fallback_;
@ -721,7 +793,7 @@ class IstioStatsFilter : public Http::PassThroughFilter,
public Network::ConnectionCallbacks {
public:
IstioStatsFilter(ConfigSharedPtr config)
: config_(config), context_(*config->context_), pool_(config->scope_.symbolTable()) {
: config_(config), context_(*config->context_), pool_(config->scope()->symbolTable()) {
tags_.reserve(25);
switch (config_->reporter()) {
case Reporter::ServerSidecar:

View File

@ -86,6 +86,7 @@ func init() {
"TestStatsPayload/UseHostHeader/envoy.wasm.runtime.v8",
"TestStatsPayload/UseHostHeader/",
"TestStatsParserRegression",
"TestStatsExpiry",
"TestTCPMetadataExchange",
"TestTCPMetadataExchangeNoAlpn",
},

View File

@ -751,3 +751,43 @@ func TestStatsServerWaypointProxyCONNECT(t *testing.T) {
t.Fatal(err)
}
}
func TestStatsExpiry(t *testing.T) {
params := driver.NewTestParams(t, map[string]string{
"RequestCount": "1",
"StatsConfig": driver.LoadTestData("testdata/bootstrap/stats.yaml.tmpl"),
"StatsFilterClientConfig": driver.LoadTestJSON("testdata/stats/client_config_expiry.yaml"),
"StatsFilterServerConfig": driver.LoadTestJSON("testdata/stats/server_config.yaml"),
}, envoye2e.ProxyE2ETests)
params.Vars["ClientMetadata"] = params.LoadTestData("testdata/client_node_metadata.json.tmpl")
params.Vars["ServerMetadata"] = params.LoadTestData("testdata/server_node_metadata.json.tmpl")
enableStats(t, params.Vars)
if err := (&driver.Scenario{
Steps: []driver.Step{
&driver.XDS{},
&driver.Update{
Node: "client",
Version: "0",
Clusters: []string{params.LoadTestData("testdata/cluster/server.yaml.tmpl")},
Listeners: []string{params.LoadTestData("testdata/listener/client.yaml.tmpl")},
},
&driver.Update{Node: "server", Version: "0", Listeners: []string{params.LoadTestData("testdata/listener/server.yaml.tmpl")}},
&driver.Envoy{Bootstrap: params.LoadTestData("testdata/bootstrap/server.yaml.tmpl")},
&driver.Envoy{Bootstrap: params.LoadTestData("testdata/bootstrap/client.yaml.tmpl")},
&driver.Sleep{Duration: 1 * time.Second},
&driver.Repeat{
N: 1,
Step: &driver.HTTPCall{
Port: params.Ports.ClientPort,
Body: "hello, world!",
},
},
&driver.Sleep{Duration: 4 * time.Second},
&driver.Stats{AdminPort: params.Ports.ClientAdmin, Matchers: map[string]driver.StatMatcher{
"istio_requests_total": &driver.MissingStat{Metric: "istio_requests_total"},
}},
},
}).Run(params); err != nil {
t.Fatal(err)
}
}

View File

@ -0,0 +1,2 @@
rotation_interval: 2s
graceful_deletion_interval: 1s