diff --git a/source/extensions/filters/http/istio_stats/config.proto b/source/extensions/filters/http/istio_stats/config.proto index 8694b4591..cff0963a1 100644 --- a/source/extensions/filters/http/istio_stats/config.proto +++ b/source/extensions/filters/http/istio_stats/config.proto @@ -111,4 +111,12 @@ message PluginConfig { // Proxy deployment type. Reporter reporter = 10; + + // Metric scope rotation interval. Set to 0 to disable the metric scope rotation. + // Defaults to 0. + google.protobuf.Duration rotation_interval = 11; + + // Metric expiry graceful deletion interval. No-op if the metric rotation is disabled. + // Defaults to 5m. Must be >=1s. + google.protobuf.Duration graceful_deletion_interval = 12; } diff --git a/source/extensions/filters/http/istio_stats/istio_stats.cc b/source/extensions/filters/http/istio_stats/istio_stats.cc index 4da4d98be..35a6e1c1b 100644 --- a/source/extensions/filters/http/istio_stats/istio_stats.cc +++ b/source/extensions/filters/http/istio_stats/istio_stats.cc @@ -14,6 +14,8 @@ #include "source/extensions/filters/http/istio_stats/istio_stats.h" +#include + #include "envoy/registry/registry.h" #include "envoy/server/factory_context.h" #include "envoy/singleton/manager.h" @@ -103,8 +105,7 @@ enum class Reporter { ClientSidecar, // Regular inbound listener on a sidecar. ServerSidecar, - // Inbound listener on a shared proxy. The destination properties are derived - // from the endpoint metadata instead of the proxy bootstrap. + // Gateway listener for a set of destination workloads. ServerGateway, }; @@ -457,6 +458,73 @@ struct MetricOverrides : public Logger::Loggable, absl::flat_hash_map expression_ids_; }; +// Self-managed scope with active rotation. Envoy stats scope controls the +// lifetime of the individual metrics. Because the scope is attached to xDS +// resources, metrics with data derived from the requests can accumulate and +// grow indefinitely for long-living xDS resources. To limit this growth, +// this class implements a rotation mechanism, whereas a new scope is created +// periodically to replace the current scope. +// +// The replaced stats scope is deleted gracefully after a minimum of 1s delay +// for two reasons: +// +// 1. Stats flushing is asynchronous and the data may be lost if not flushed +// before the deletion (see stats_flush_interval). +// +// 2. The implementation avoids locking by releasing a raw pointer to workers. +// When the rotation happens on the main, the raw pointer may still be in-use +// by workers for a short duration. +class RotatingScope : public Logger::Loggable { +public: + RotatingScope(Server::Configuration::FactoryContext& factory_context, uint64_t rotate_interval_ms, + uint64_t delete_interval_ms) + : parent_scope_(factory_context.scope()), active_scope_(parent_scope_.createScope("")), + raw_scope_(active_scope_.get()), rotate_interval_ms_(rotate_interval_ms), + delete_interval_ms_(delete_interval_ms) { + if (rotate_interval_ms_ > 0) { + ASSERT(delete_interval_ms_ < rotate_interval_ms_); + ASSERT(delete_interval_ms_ >= 1000); + Event::Dispatcher& dispatcher = factory_context.mainThreadDispatcher(); + rotate_timer_ = dispatcher.createTimer([this] { onRotate(); }); + delete_timer_ = dispatcher.createTimer([this] { onDelete(); }); + rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_)); + } + } + ~RotatingScope() { + if (rotate_timer_) { + rotate_timer_->disableTimer(); + rotate_timer_.reset(); + } + if (delete_timer_) { + delete_timer_->disableTimer(); + delete_timer_.reset(); + } + } + Stats::Scope* scope() { return raw_scope_.load(); } + +private: + void onRotate() { + ENVOY_LOG(info, "Rotating active Istio stats scope after {}ms.", rotate_interval_ms_); + draining_scope_ = active_scope_; + delete_timer_->enableTimer(std::chrono::milliseconds(delete_interval_ms_)); + active_scope_ = parent_scope_.createScope(""); + raw_scope_.store(active_scope_.get()); + rotate_timer_->enableTimer(std::chrono::milliseconds(rotate_interval_ms_)); + } + void onDelete() { + ENVOY_LOG(info, "Deleting draining Istio stats scope after {}ms.", delete_interval_ms_); + draining_scope_.reset(); + } + Stats::Scope& parent_scope_; + Stats::ScopeSharedPtr active_scope_; + std::atomic raw_scope_; + Stats::ScopeSharedPtr draining_scope_{nullptr}; + const uint64_t rotate_interval_ms_; + const uint64_t delete_interval_ms_; + Event::TimerPtr rotate_timer_{nullptr}; + Event::TimerPtr delete_timer_{nullptr}; +}; + struct Config : public Logger::Loggable { Config(const stats::PluginConfig& proto_config, Server::Configuration::FactoryContext& factory_context) @@ -466,7 +534,9 @@ struct Config : public Logger::Loggable { return std::make_shared(factory_context.serverScope().symbolTable(), factory_context.localInfo().node()); })), - scope_(factory_context.scope()), + scope_(factory_context, PROTOBUF_GET_MS_OR_DEFAULT(proto_config, rotation_interval, 0), + PROTOBUF_GET_MS_OR_DEFAULT(proto_config, graceful_deletion_interval, + /* 5m */ 1000 * 60 * 5)), disable_host_header_fallback_(proto_config.disable_host_header_fallback()), report_duration_( PROTOBUF_GET_MS_OR_DEFAULT(proto_config, tcp_reporting_duration, /* 5s */ 5000)), @@ -493,7 +563,7 @@ struct Config : public Logger::Loggable { break; } if (proto_config.metrics_size() > 0 || proto_config.definitions_size() > 0) { - metric_overrides_ = std::make_unique(context_, scope_.symbolTable()); + metric_overrides_ = std::make_unique(context_, scope()->symbolTable()); for (const auto& definition : proto_config.definitions()) { const auto& it = context_->all_metrics_.find(definition.name()); if (it != context_->all_metrics_.end()) { @@ -629,12 +699,12 @@ struct Config : public Logger::Loggable { return; } auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_); - Stats::Utility::counterFromStatNames(parent_.scope_, + Stats::Utility::counterFromStatNames(*parent_.scope(), {parent_.context_->stat_namespace_, metric}, new_tags) .add(amount); return; } - Stats::Utility::counterFromStatNames(parent_.scope_, + Stats::Utility::counterFromStatNames(*parent_.scope(), {parent_.context_->stat_namespace_, metric}, tags) .add(amount); } @@ -647,12 +717,12 @@ struct Config : public Logger::Loggable { } auto new_tags = parent_.metric_overrides_->overrideTags(metric, tags, expr_values_); Stats::Utility::histogramFromStatNames( - parent_.scope_, {parent_.context_->stat_namespace_, metric}, unit, new_tags) + *parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, new_tags) .recordValue(value); return; } Stats::Utility::histogramFromStatNames( - parent_.scope_, {parent_.context_->stat_namespace_, metric}, unit, tags) + *parent_.scope(), {parent_.context_->stat_namespace_, metric}, unit, tags) .recordValue(value); } @@ -664,17 +734,17 @@ struct Config : public Logger::Loggable { switch (metric.type_) { case MetricOverrides::MetricType::Counter: Stats::Utility::counterFromStatNames( - parent_.scope_, {parent_.context_->stat_namespace_, metric.name_}, tags) + *parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, tags) .add(amount); break; case MetricOverrides::MetricType::Histogram: Stats::Utility::histogramFromStatNames( - parent_.scope_, {parent_.context_->stat_namespace_, metric.name_}, + *parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, Stats::Histogram::Unit::Bytes, tags) .recordValue(amount); break; case MetricOverrides::MetricType::Gauge: - Stats::Utility::gaugeFromStatNames(parent_.scope_, + Stats::Utility::gaugeFromStatNames(*parent_.scope(), {parent_.context_->stat_namespace_, metric.name_}, Stats::Gauge::ImportMode::Accumulate, tags) .set(amount); @@ -696,15 +766,17 @@ struct Config : public Logger::Loggable { tags.push_back({context_->tag_, context_->istio_version_.empty() ? context_->unknown_ : context_->istio_version_}); - Stats::Utility::gaugeFromStatNames(scope_, {context_->stat_namespace_, context_->istio_build_}, + Stats::Utility::gaugeFromStatNames(*scope(), + {context_->stat_namespace_, context_->istio_build_}, Stats::Gauge::ImportMode::Accumulate, tags) .set(1); } Reporter reporter() const { return reporter_; } + Stats::Scope* scope() { return scope_.scope(); } ContextSharedPtr context_; - Stats::Scope& scope_; + RotatingScope scope_; Reporter reporter_; const bool disable_host_header_fallback_; @@ -721,7 +793,7 @@ class IstioStatsFilter : public Http::PassThroughFilter, public Network::ConnectionCallbacks { public: IstioStatsFilter(ConfigSharedPtr config) - : config_(config), context_(*config->context_), pool_(config->scope_.symbolTable()) { + : config_(config), context_(*config->context_), pool_(config->scope()->symbolTable()) { tags_.reserve(25); switch (config_->reporter()) { case Reporter::ServerSidecar: diff --git a/test/envoye2e/inventory.go b/test/envoye2e/inventory.go index 0a432e5ad..c8023813d 100644 --- a/test/envoye2e/inventory.go +++ b/test/envoye2e/inventory.go @@ -86,6 +86,7 @@ func init() { "TestStatsPayload/UseHostHeader/envoy.wasm.runtime.v8", "TestStatsPayload/UseHostHeader/", "TestStatsParserRegression", + "TestStatsExpiry", "TestTCPMetadataExchange", "TestTCPMetadataExchangeNoAlpn", }, diff --git a/test/envoye2e/stats_plugin/stats_test.go b/test/envoye2e/stats_plugin/stats_test.go index b3234dcfa..af952db5c 100644 --- a/test/envoye2e/stats_plugin/stats_test.go +++ b/test/envoye2e/stats_plugin/stats_test.go @@ -751,3 +751,43 @@ func TestStatsServerWaypointProxyCONNECT(t *testing.T) { t.Fatal(err) } } + +func TestStatsExpiry(t *testing.T) { + params := driver.NewTestParams(t, map[string]string{ + "RequestCount": "1", + "StatsConfig": driver.LoadTestData("testdata/bootstrap/stats.yaml.tmpl"), + "StatsFilterClientConfig": driver.LoadTestJSON("testdata/stats/client_config_expiry.yaml"), + "StatsFilterServerConfig": driver.LoadTestJSON("testdata/stats/server_config.yaml"), + }, envoye2e.ProxyE2ETests) + params.Vars["ClientMetadata"] = params.LoadTestData("testdata/client_node_metadata.json.tmpl") + params.Vars["ServerMetadata"] = params.LoadTestData("testdata/server_node_metadata.json.tmpl") + enableStats(t, params.Vars) + if err := (&driver.Scenario{ + Steps: []driver.Step{ + &driver.XDS{}, + &driver.Update{ + Node: "client", + Version: "0", + Clusters: []string{params.LoadTestData("testdata/cluster/server.yaml.tmpl")}, + Listeners: []string{params.LoadTestData("testdata/listener/client.yaml.tmpl")}, + }, + &driver.Update{Node: "server", Version: "0", Listeners: []string{params.LoadTestData("testdata/listener/server.yaml.tmpl")}}, + &driver.Envoy{Bootstrap: params.LoadTestData("testdata/bootstrap/server.yaml.tmpl")}, + &driver.Envoy{Bootstrap: params.LoadTestData("testdata/bootstrap/client.yaml.tmpl")}, + &driver.Sleep{Duration: 1 * time.Second}, + &driver.Repeat{ + N: 1, + Step: &driver.HTTPCall{ + Port: params.Ports.ClientPort, + Body: "hello, world!", + }, + }, + &driver.Sleep{Duration: 4 * time.Second}, + &driver.Stats{AdminPort: params.Ports.ClientAdmin, Matchers: map[string]driver.StatMatcher{ + "istio_requests_total": &driver.MissingStat{Metric: "istio_requests_total"}, + }}, + }, + }).Run(params); err != nil { + t.Fatal(err) + } +} diff --git a/testdata/stats/client_config_expiry.yaml b/testdata/stats/client_config_expiry.yaml new file mode 100644 index 000000000..1b95d3d47 --- /dev/null +++ b/testdata/stats/client_config_expiry.yaml @@ -0,0 +1,2 @@ +rotation_interval: 2s +graceful_deletion_interval: 1s