mirror of https://github.com/spiffe/spire.git
Compare commits
2 Commits
Author | SHA1 | Date |
---|---|---|
|
6ba4d56c47 | |
|
7b792722c6 |
20
CHANGELOG.md
20
CHANGELOG.md
|
@ -1,5 +1,25 @@
|
||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## [1.10.2] - 2024-09-03
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- `http_challenge` NodeAttestor plugin (#4909)
|
||||||
|
- Experimental support for validating container image signatures through Sigstore selectors in the docker Workload Attestor (#5272)
|
||||||
|
- Metrics for monitoring the event-based cache (#5411)
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
- Delegated Identity API to allow subscription by process ID (#5272)
|
||||||
|
- Agent Debug endpoint to count SVIDs by type (#5352)
|
||||||
|
- Agent health check to report an unhealthy status until the Agent SVID is attested (#5298)
|
||||||
|
- Small documentation improvements (#5393)
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- `aws_iid` NodeAttestor to properly handle multiple network interfaces (#5300)
|
||||||
|
- Server configuration to correctly propagate the `sql_transaction_timeout` setting in the experimental events-based cache (#5345)
|
||||||
|
|
||||||
## [1.10.1] - 2024-08-01
|
## [1.10.1] - 2024-08-01
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
|
@ -51,7 +51,15 @@ The following metrics are emitted:
|
||||||
| Call Counter | `datastore`, `registration_entry_event`, `list` | | The Datastore is listing a registration entry events. |
|
| Call Counter | `datastore`, `registration_entry_event`, `list` | | The Datastore is listing a registration entry events. |
|
||||||
| Call Counter | `datastore`, `registration_entry_event`, `prune` | | The Datastore is pruning expired registration entry events. |
|
| Call Counter | `datastore`, `registration_entry_event`, `prune` | | The Datastore is pruning expired registration entry events. |
|
||||||
| Call Counter | `datastore`, `registration_entry_event`, `fetch` | | The Datastore is fetching a specific registration entry event. |
|
| Call Counter | `datastore`, `registration_entry_event`, `fetch` | | The Datastore is fetching a specific registration entry event. |
|
||||||
| Call Counter | `entry`, `cache`, `reload` | | The Server is reloading its in-memory entry cache from the datastore. |
|
| Call Counter | `entry`, `cache`, `reload` | | The Server is reloading its in-memory entry cache from the datastore |
|
||||||
|
| Gauge | `node`, `agents_by_id_cache`, `count` | | The Server is re-hydrating the agents-by-id event-based cache |
|
||||||
|
| Gauge | `node`, `agents_by_expiresat_cache`, `count` | | The Server is re-hydrating the agents-by-expiresat event-based cache |
|
||||||
|
| Gauge | `node`, `skipped_node_event_ids`, `count` | | The count of skipped ids detected in the last `sql_transaction_timout` period. For databases that autoincrement ids by more than one, this number will overreport the skipped ids. [Issue](https://github.com/spiffe/spire/issues/5341) |
|
||||||
|
| Gauge | `entry`, `nodealiases_by_entryid_cache`, `count` | | The Server is re-hydrating the nodealiases-by-entryid event-based cache |
|
||||||
|
| Gauge | `entry`, `nodealiases_by_selector_cache`, `count` | | The Server is re-hydrating the nodealiases-by-selector event-based cache |
|
||||||
|
| Gauge | `entry`, `entries_by_entryid_cache`, `count` | | The Server is re-hydrating the entries-by-entryid event-based cache |
|
||||||
|
| Gauge | `entry`, `entries_by_parentid_cache`, `count` | | The Server is re-hydrating the entries-by-parentid event-based cache |
|
||||||
|
| Gauge | `entry`, `skipped_entry_event_ids`, `count` | | The count of skipped ids detected in the last sql_transaction_timout period. For databases that autoincrement ids by more than one, this number will overreport the skipped ids. [Issue](https://github.com/spiffe/spire/issues/5341) |
|
||||||
| Counter | `manager`, `jwt_key`, `activate` | | The CA manager has successfully activated a JWT Key. |
|
| Counter | `manager`, `jwt_key`, `activate` | | The CA manager has successfully activated a JWT Key. |
|
||||||
| Gauge | `manager`, `x509_ca`, `rotate`, `ttl` | `trust_domain_id` | The CA manager is rotating the X.509 CA with a given TTL for a specific Trust Domain. |
|
| Gauge | `manager`, `x509_ca`, `rotate`, `ttl` | `trust_domain_id` | The CA manager is rotating the X.509 CA with a given TTL for a specific Trust Domain. |
|
||||||
| Call Counter | `registration_entry`, `manager`, `prune` | | The Registration manager is pruning entries. |
|
| Call Counter | `registration_entry`, `manager`, `prune` | | The Registration manager is pruning entries. |
|
||||||
|
|
|
@ -663,6 +663,24 @@ const (
|
||||||
// Cache functionality related to a cache
|
// Cache functionality related to a cache
|
||||||
Cache = "cache"
|
Cache = "cache"
|
||||||
|
|
||||||
|
// AgentsByIDCache functionality related to the agent btree cache indexed by ID
|
||||||
|
AgentsByIDCache = "agents_by_id_cache"
|
||||||
|
|
||||||
|
// AgentsByExpiresAtCache functionality related to the agent btree cache indexed by ExpiresAt
|
||||||
|
AgentsByExpiresAtCache = "agents_by_expiresat_cache"
|
||||||
|
|
||||||
|
// NodeAliasesByEntryIDCache functionality related to the node-aliases btree cache indexed by EntryID
|
||||||
|
NodeAliasesByEntryIDCache = "nodealiases_by_entryid_cache"
|
||||||
|
|
||||||
|
// NodeAliasesBySelectorCache functionality related to the node-aliases btree cache indexed by Selector
|
||||||
|
NodeAliasesBySelectorCache = "nodealiases_by_selector_cache"
|
||||||
|
|
||||||
|
// EntriesByEntryIDCache functionality related to the entries btree cache indexed by EntryID
|
||||||
|
EntriesByEntryIDCache = "entries_by_entryid_cache"
|
||||||
|
|
||||||
|
// EntriesByParentIDCache functionality related to the entries btree cache indexed by ParentID
|
||||||
|
EntriesByParentIDCache = "entries_by_parentid_cache"
|
||||||
|
|
||||||
// Cache type tag
|
// Cache type tag
|
||||||
CacheType = "cache_type"
|
CacheType = "cache_type"
|
||||||
|
|
||||||
|
@ -861,8 +879,11 @@ const (
|
||||||
// ListAgents functionality related to listing agents
|
// ListAgents functionality related to listing agents
|
||||||
ListAgents = "list_agents"
|
ListAgents = "list_agents"
|
||||||
|
|
||||||
// CountEntries functionality related to counting all registration entries
|
// SkippedEntryEventIDs functionality related to counting missed entry event IDs
|
||||||
CountEntries = "count_entries"
|
SkippedEntryEventIDs = "skipped_entry_event_ids"
|
||||||
|
|
||||||
|
// SkippedNodeEventIDs functionality related to counting missed node event IDs
|
||||||
|
SkippedNodeEventIDs = "skipped_node_event_ids"
|
||||||
|
|
||||||
// ListAllEntriesWithPages functionality related to listing all registration entries with pagination
|
// ListAllEntriesWithPages functionality related to listing all registration entries with pagination
|
||||||
ListAllEntriesWithPages = "list_all_entries_with_pages"
|
ListAllEntriesWithPages = "list_all_entries_with_pages"
|
||||||
|
|
|
@ -7,3 +7,51 @@ import "github.com/spiffe/spire/pkg/common/telemetry"
|
||||||
func SetEntryDeletedGauge(m telemetry.Metrics, deleted int) {
|
func SetEntryDeletedGauge(m telemetry.Metrics, deleted int) {
|
||||||
m.SetGauge([]string{telemetry.Entry, telemetry.Deleted}, float32(deleted))
|
m.SetGauge([]string{telemetry.Entry, telemetry.Deleted}, float32(deleted))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetAgentsByIDCacheCountGauge emits a gauge with the number of agents by ID that are
|
||||||
|
// currently in the node cache.
|
||||||
|
func SetAgentsByIDCacheCountGauge(m telemetry.Metrics, size int) {
|
||||||
|
m.SetGauge([]string{telemetry.Node, telemetry.AgentsByIDCache, telemetry.Count}, float32(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetAgentsByExpiresAtCacheCountGauge emits a gauge with the number of agents by expiresAt that are
|
||||||
|
// currently in the node cache.
|
||||||
|
func SetAgentsByExpiresAtCacheCountGauge(m telemetry.Metrics, size int) {
|
||||||
|
m.SetGauge([]string{telemetry.Node, telemetry.AgentsByExpiresAtCache, telemetry.Count}, float32(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetSkippedNodeEventIDsCacheCountGauge emits a gauge with the number of entries that are
|
||||||
|
// currently in the skipped-node events cache.
|
||||||
|
func SetSkippedNodeEventIDsCacheCountGauge(m telemetry.Metrics, size int) {
|
||||||
|
m.SetGauge([]string{telemetry.Node, telemetry.SkippedNodeEventIDs, telemetry.Count}, float32(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetNodeAliasesByEntryIDCacheCountGauge emits a gauge with the number of Node Aliases by EntryID that are
|
||||||
|
// currently in the entry cache.
|
||||||
|
func SetNodeAliasesByEntryIDCacheCountGauge(m telemetry.Metrics, size int) {
|
||||||
|
m.SetGauge([]string{telemetry.Entry, telemetry.NodeAliasesByEntryIDCache, telemetry.Count}, float32(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetNodeAliasesBySelectorCacheCountGauge emits a gauge with the number of Node Aliases by Selector that are
|
||||||
|
// currently in the entry cache.
|
||||||
|
func SetNodeAliasesBySelectorCacheCountGauge(m telemetry.Metrics, size int) {
|
||||||
|
m.SetGauge([]string{telemetry.Entry, telemetry.NodeAliasesBySelectorCache, telemetry.Count}, float32(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetEntriesByEntryIDCacheCountGauge emits a gauge with the number of entries by entryID that are
|
||||||
|
// currently in the entry cache.
|
||||||
|
func SetEntriesByEntryIDCacheCountGauge(m telemetry.Metrics, size int) {
|
||||||
|
m.SetGauge([]string{telemetry.Entry, telemetry.EntriesByEntryIDCache, telemetry.Count}, float32(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetEntriesByParentIDCacheCountGauge emits a gauge with the number of entries by parentID that are
|
||||||
|
// currently in the entry cache.
|
||||||
|
func SetEntriesByParentIDCacheCountGauge(m telemetry.Metrics, size int) {
|
||||||
|
m.SetGauge([]string{telemetry.Entry, telemetry.EntriesByParentIDCache, telemetry.Count}, float32(size))
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetSkippedEntryEventIDsCacheCountGauge emits a gauge with the number of entries that are
|
||||||
|
// currently in the skipped-entry events cache.
|
||||||
|
func SetSkippedEntryEventIDsCacheCountGauge(m telemetry.Metrics, size int) {
|
||||||
|
m.SetGauge([]string{telemetry.Entry, telemetry.SkippedEntryEventIDs, telemetry.Count}, float32(size))
|
||||||
|
}
|
||||||
|
|
|
@ -267,7 +267,7 @@ func (c *Cache) removeEntry(entryID string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cache) stats() cacheStats {
|
func (c *Cache) Stats() cacheStats {
|
||||||
return cacheStats{
|
return cacheStats{
|
||||||
AgentsByID: c.agentsByID.Len(),
|
AgentsByID: c.agentsByID.Len(),
|
||||||
AgentsByExpiresAt: c.agentsByExpiresAt.Len(),
|
AgentsByExpiresAt: c.agentsByExpiresAt.Len(),
|
||||||
|
|
|
@ -173,7 +173,7 @@ func TestCacheInternalStats(t *testing.T) {
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
t.Run("pristine", func(t *testing.T) {
|
t.Run("pristine", func(t *testing.T) {
|
||||||
cache := NewCache(clk)
|
cache := NewCache(clk)
|
||||||
require.Zero(t, cache.stats())
|
require.Zero(t, cache.Stats())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("entries and aliases", func(t *testing.T) {
|
t.Run("entries and aliases", func(t *testing.T) {
|
||||||
|
@ -189,13 +189,13 @@ func TestCacheInternalStats(t *testing.T) {
|
||||||
require.Equal(t, cacheStats{
|
require.Equal(t, cacheStats{
|
||||||
EntriesByEntryID: 1,
|
EntriesByEntryID: 1,
|
||||||
EntriesByParentID: 1,
|
EntriesByParentID: 1,
|
||||||
}, cache.stats())
|
}, cache.Stats())
|
||||||
|
|
||||||
cache.UpdateEntry(entry2a)
|
cache.UpdateEntry(entry2a)
|
||||||
require.Equal(t, cacheStats{
|
require.Equal(t, cacheStats{
|
||||||
EntriesByEntryID: 2,
|
EntriesByEntryID: 2,
|
||||||
EntriesByParentID: 2,
|
EntriesByParentID: 2,
|
||||||
}, cache.stats())
|
}, cache.Stats())
|
||||||
|
|
||||||
cache.UpdateEntry(entry2b)
|
cache.UpdateEntry(entry2b)
|
||||||
require.Equal(t, cacheStats{
|
require.Equal(t, cacheStats{
|
||||||
|
@ -203,20 +203,20 @@ func TestCacheInternalStats(t *testing.T) {
|
||||||
EntriesByParentID: 1,
|
EntriesByParentID: 1,
|
||||||
AliasesByEntryID: 2, // one for each selector
|
AliasesByEntryID: 2, // one for each selector
|
||||||
AliasesBySelector: 2, // one for each selector
|
AliasesBySelector: 2, // one for each selector
|
||||||
}, cache.stats())
|
}, cache.Stats())
|
||||||
|
|
||||||
cache.RemoveEntry(entry1.Id)
|
cache.RemoveEntry(entry1.Id)
|
||||||
require.Equal(t, cacheStats{
|
require.Equal(t, cacheStats{
|
||||||
AliasesByEntryID: 2, // one for each selector
|
AliasesByEntryID: 2, // one for each selector
|
||||||
AliasesBySelector: 2, // one for each selector
|
AliasesBySelector: 2, // one for each selector
|
||||||
}, cache.stats())
|
}, cache.Stats())
|
||||||
|
|
||||||
cache.RemoveEntry(entry2b.Id)
|
cache.RemoveEntry(entry2b.Id)
|
||||||
require.Zero(t, cache.stats())
|
require.Zero(t, cache.Stats())
|
||||||
|
|
||||||
// Remove again and make sure nothing happens.
|
// Remove again and make sure nothing happens.
|
||||||
cache.RemoveEntry(entry2b.Id)
|
cache.RemoveEntry(entry2b.Id)
|
||||||
require.Zero(t, cache.stats())
|
require.Zero(t, cache.Stats())
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("agents", func(t *testing.T) {
|
t.Run("agents", func(t *testing.T) {
|
||||||
|
@ -225,28 +225,28 @@ func TestCacheInternalStats(t *testing.T) {
|
||||||
require.Equal(t, cacheStats{
|
require.Equal(t, cacheStats{
|
||||||
AgentsByID: 1,
|
AgentsByID: 1,
|
||||||
AgentsByExpiresAt: 1,
|
AgentsByExpiresAt: 1,
|
||||||
}, cache.stats())
|
}, cache.Stats())
|
||||||
|
|
||||||
cache.UpdateAgent(agent2.String(), now.Add(time.Hour*2), []*types.Selector{sel2})
|
cache.UpdateAgent(agent2.String(), now.Add(time.Hour*2), []*types.Selector{sel2})
|
||||||
require.Equal(t, cacheStats{
|
require.Equal(t, cacheStats{
|
||||||
AgentsByID: 2,
|
AgentsByID: 2,
|
||||||
AgentsByExpiresAt: 2,
|
AgentsByExpiresAt: 2,
|
||||||
}, cache.stats())
|
}, cache.Stats())
|
||||||
|
|
||||||
cache.UpdateAgent(agent2.String(), now.Add(time.Hour*3), []*types.Selector{sel2})
|
cache.UpdateAgent(agent2.String(), now.Add(time.Hour*3), []*types.Selector{sel2})
|
||||||
require.Equal(t, cacheStats{
|
require.Equal(t, cacheStats{
|
||||||
AgentsByID: 2,
|
AgentsByID: 2,
|
||||||
AgentsByExpiresAt: 2,
|
AgentsByExpiresAt: 2,
|
||||||
}, cache.stats())
|
}, cache.Stats())
|
||||||
|
|
||||||
cache.RemoveAgent(agent1.String())
|
cache.RemoveAgent(agent1.String())
|
||||||
require.Equal(t, cacheStats{
|
require.Equal(t, cacheStats{
|
||||||
AgentsByID: 1,
|
AgentsByID: 1,
|
||||||
AgentsByExpiresAt: 1,
|
AgentsByExpiresAt: 1,
|
||||||
}, cache.stats())
|
}, cache.Stats())
|
||||||
|
|
||||||
cache.RemoveAgent(agent2.String())
|
cache.RemoveAgent(agent2.String())
|
||||||
require.Zero(t, cache.stats())
|
require.Zero(t, cache.Stats())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/spiffe/go-spiffe/v2/spiffeid"
|
"github.com/spiffe/go-spiffe/v2/spiffeid"
|
||||||
"github.com/spiffe/spire-api-sdk/proto/spire/api/types"
|
"github.com/spiffe/spire-api-sdk/proto/spire/api/types"
|
||||||
|
"github.com/spiffe/spire/pkg/common/telemetry"
|
||||||
"github.com/spiffe/spire/pkg/server/api"
|
"github.com/spiffe/spire/pkg/server/api"
|
||||||
"github.com/spiffe/spire/pkg/server/authorizedentries"
|
"github.com/spiffe/spire/pkg/server/authorizedentries"
|
||||||
"github.com/spiffe/spire/pkg/server/datastore"
|
"github.com/spiffe/spire/pkg/server/datastore"
|
||||||
|
@ -36,9 +37,9 @@ type eventsBasedCache interface {
|
||||||
pruneMissedEvents()
|
pruneMissedEvents()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAuthorizedEntryFetcherWithEventsBasedCache(ctx context.Context, log logrus.FieldLogger, clk clock.Clock, ds datastore.DataStore, cacheReloadInterval, pruneEventsOlderThan, sqlTransactionTimeout time.Duration) (*AuthorizedEntryFetcherWithEventsBasedCache, error) {
|
func NewAuthorizedEntryFetcherWithEventsBasedCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, clk clock.Clock, ds datastore.DataStore, cacheReloadInterval, pruneEventsOlderThan, sqlTransactionTimeout time.Duration) (*AuthorizedEntryFetcherWithEventsBasedCache, error) {
|
||||||
log.Info("Building event-based in-memory entry cache")
|
log.Info("Building event-based in-memory entry cache")
|
||||||
cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk, sqlTransactionTimeout)
|
cache, registrationEntries, attestedNodes, err := buildCache(ctx, log, metrics, ds, clk, sqlTransactionTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -111,15 +112,15 @@ func (a *AuthorizedEntryFetcherWithEventsBasedCache) updateCache(ctx context.Con
|
||||||
return errors.Join(updateRegistrationEntriesCacheErr, updateAttestedNodesCacheErr)
|
return errors.Join(updateRegistrationEntriesCacheErr, updateAttestedNodesCacheErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, sqlTransactionTimeout time.Duration) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) {
|
func buildCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, sqlTransactionTimeout time.Duration) (*authorizedentries.Cache, *registrationEntries, *attestedNodes, error) {
|
||||||
cache := authorizedentries.NewCache(clk)
|
cache := authorizedentries.NewCache(clk)
|
||||||
|
|
||||||
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, sqlTransactionTimeout)
|
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, buildCachePageSize, sqlTransactionTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, sqlTransactionTimeout)
|
attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, sqlTransactionTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,9 @@ import (
|
||||||
|
|
||||||
"github.com/andres-erbsen/clock"
|
"github.com/andres-erbsen/clock"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/spiffe/spire/pkg/common/telemetry"
|
"github.com/spiffe/spire/pkg/common/telemetry"
|
||||||
|
server_telemetry "github.com/spiffe/spire/pkg/common/telemetry/server"
|
||||||
"github.com/spiffe/spire/pkg/server/api"
|
"github.com/spiffe/spire/pkg/server/api"
|
||||||
"github.com/spiffe/spire/pkg/server/authorizedentries"
|
"github.com/spiffe/spire/pkg/server/authorizedentries"
|
||||||
"github.com/spiffe/spire/pkg/server/datastore"
|
"github.com/spiffe/spire/pkg/server/datastore"
|
||||||
|
@ -17,11 +19,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type attestedNodes struct {
|
type attestedNodes struct {
|
||||||
cache *authorizedentries.Cache
|
cache *authorizedentries.Cache
|
||||||
clk clock.Clock
|
clk clock.Clock
|
||||||
ds datastore.DataStore
|
ds datastore.DataStore
|
||||||
log logrus.FieldLogger
|
log logrus.FieldLogger
|
||||||
mu sync.RWMutex
|
metrics telemetry.Metrics
|
||||||
|
mu sync.RWMutex
|
||||||
|
|
||||||
firstEventID uint
|
firstEventID uint
|
||||||
firstEventTime time.Time
|
firstEventTime time.Time
|
||||||
|
@ -33,7 +36,7 @@ type attestedNodes struct {
|
||||||
|
|
||||||
// buildAttestedNodesCache fetches all attested nodes and adds the unexpired ones to the cache.
|
// buildAttestedNodesCache fetches all attested nodes and adds the unexpired ones to the cache.
|
||||||
// It runs once at startup.
|
// It runs once at startup.
|
||||||
func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) {
|
func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, sqlTransactionTimeout time.Duration) (*attestedNodes, error) {
|
||||||
resp, err := ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{})
|
resp, err := ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -82,6 +85,7 @@ func buildAttestedNodesCache(ctx context.Context, log logrus.FieldLogger, ds dat
|
||||||
firstEventID: firstEventID,
|
firstEventID: firstEventID,
|
||||||
firstEventTime: firstEventTime,
|
firstEventTime: firstEventTime,
|
||||||
log: log,
|
log: log,
|
||||||
|
metrics: metrics,
|
||||||
lastEventID: lastEventID,
|
lastEventID: lastEventID,
|
||||||
missedEvents: missedEvents,
|
missedEvents: missedEvents,
|
||||||
seenMissedStartupEvents: make(map[uint]struct{}),
|
seenMissedStartupEvents: make(map[uint]struct{}),
|
||||||
|
@ -142,6 +146,10 @@ func (a *attestedNodes) updateCache(ctx context.Context) error {
|
||||||
a.lastEventID = event.EventID
|
a.lastEventID = event.EventID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// These two should be the same value but it's valuable to have them both be emitted for incident triage.
|
||||||
|
server_telemetry.SetAgentsByExpiresAtCacheCountGauge(a.metrics, a.cache.Stats().AgentsByExpiresAt)
|
||||||
|
server_telemetry.SetAgentsByIDCacheCountGauge(a.metrics, a.cache.Stats().AgentsByID)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,6 +209,7 @@ func (a *attestedNodes) replayMissedEvents(ctx context.Context) {
|
||||||
|
|
||||||
delete(a.missedEvents, eventID)
|
delete(a.missedEvents, eventID)
|
||||||
}
|
}
|
||||||
|
server_telemetry.SetSkippedNodeEventIDsCacheCountGauge(a.metrics, len(a.missedEvents))
|
||||||
}
|
}
|
||||||
|
|
||||||
// updatedCacheEntry update/deletes/creates an individual attested node in the cache.
|
// updatedCacheEntry update/deletes/creates an individual attested node in the cache.
|
||||||
|
|
|
@ -9,62 +9,83 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/sirupsen/logrus/hooks/test"
|
"github.com/sirupsen/logrus/hooks/test"
|
||||||
"github.com/spiffe/go-spiffe/v2/spiffeid"
|
"github.com/spiffe/go-spiffe/v2/spiffeid"
|
||||||
|
"github.com/spiffe/spire/pkg/common/telemetry"
|
||||||
"github.com/spiffe/spire/pkg/server/authorizedentries"
|
"github.com/spiffe/spire/pkg/server/authorizedentries"
|
||||||
"github.com/spiffe/spire/pkg/server/datastore"
|
"github.com/spiffe/spire/pkg/server/datastore"
|
||||||
"github.com/spiffe/spire/proto/spire/common"
|
"github.com/spiffe/spire/proto/spire/common"
|
||||||
"github.com/spiffe/spire/test/clock"
|
"github.com/spiffe/spire/test/clock"
|
||||||
"github.com/spiffe/spire/test/fakes/fakedatastore"
|
"github.com/spiffe/spire/test/fakes/fakedatastore"
|
||||||
|
"github.com/spiffe/spire/test/fakes/fakemetrics"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestUpdateAttestedNodesCache(t *testing.T) {
|
func TestUpdateAttestedNodesCache(t *testing.T) {
|
||||||
ctx := context.Background()
|
|
||||||
log, _ := test.NewNullLogger()
|
|
||||||
clk := clock.NewMock(t)
|
|
||||||
ds := fakedatastore.New(t)
|
|
||||||
cache := authorizedentries.NewCache(clk)
|
|
||||||
|
|
||||||
attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, defaultSQLTransactionTimeout)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.NotNil(t, attestedNodes)
|
|
||||||
|
|
||||||
agentID, err := spiffeid.FromString("spiffe://example.org/myagent")
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
_, err = ds.CreateAttestedNode(ctx, &common.AttestedNode{
|
|
||||||
SpiffeId: agentID.String(),
|
|
||||||
CertNotAfter: time.Now().Add(5 * time.Hour).Unix(),
|
|
||||||
})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
for _, tt := range []struct {
|
for _, tt := range []struct {
|
||||||
name string
|
name string
|
||||||
errs []error
|
errs []error
|
||||||
expectedLastAttestedNodeEventID uint
|
expectedLastAttestedNodeEventID uint
|
||||||
|
expectMetrics []fakemetrics.MetricItem
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "Error Listing Attested Node Events",
|
name: "Error Listing Attested Node Events",
|
||||||
errs: []error{errors.New("listing attested node events")},
|
errs: []error{errors.New("listing attested node events")},
|
||||||
expectedLastAttestedNodeEventID: uint(0),
|
expectedLastAttestedNodeEventID: uint(0),
|
||||||
|
expectMetrics: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Error Fetching Attested Node",
|
name: "Error Fetching Attested Node",
|
||||||
errs: []error{nil, errors.New("fetching attested node")},
|
errs: []error{nil, errors.New("fetching attested node")},
|
||||||
expectedLastAttestedNodeEventID: uint(0),
|
expectedLastAttestedNodeEventID: uint(0),
|
||||||
|
expectMetrics: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Error Getting Node Selectors",
|
name: "Error Getting Node Selectors",
|
||||||
errs: []error{nil, nil, errors.New("getting node selectors")},
|
errs: []error{nil, nil, errors.New("getting node selectors")},
|
||||||
expectedLastAttestedNodeEventID: uint(0),
|
expectedLastAttestedNodeEventID: uint(0),
|
||||||
|
expectMetrics: nil,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "No Errors",
|
name: "No Errors",
|
||||||
expectedLastAttestedNodeEventID: uint(1),
|
expectedLastAttestedNodeEventID: uint(1),
|
||||||
|
expectMetrics: []fakemetrics.MetricItem{
|
||||||
|
{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Node, telemetry.AgentsByExpiresAtCache, telemetry.Count},
|
||||||
|
Val: 1,
|
||||||
|
Labels: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Node, telemetry.AgentsByIDCache, telemetry.Count},
|
||||||
|
Val: 1,
|
||||||
|
Labels: nil,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
} {
|
} {
|
||||||
tt := tt
|
tt := tt
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
log, _ := test.NewNullLogger()
|
||||||
|
clk := clock.NewMock(t)
|
||||||
|
ds := fakedatastore.New(t)
|
||||||
|
cache := authorizedentries.NewCache(clk)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
|
attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, defaultSQLTransactionTimeout)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, attestedNodes)
|
||||||
|
|
||||||
|
agentID, err := spiffeid.FromString("spiffe://example.org/myagent")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = ds.CreateAttestedNode(ctx, &common.AttestedNode{
|
||||||
|
SpiffeId: agentID.String(),
|
||||||
|
CertNotAfter: time.Now().Add(5 * time.Hour).Unix(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
for _, err = range tt.errs {
|
for _, err = range tt.errs {
|
||||||
ds.AppendNextError(err)
|
ds.AppendNextError(err)
|
||||||
}
|
}
|
||||||
|
@ -77,6 +98,10 @@ func TestUpdateAttestedNodesCache(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(t, tt.expectedLastAttestedNodeEventID, attestedNodes.lastEventID)
|
assert.Equal(t, tt.expectedLastAttestedNodeEventID, attestedNodes.lastEventID)
|
||||||
|
|
||||||
|
if tt.expectMetrics != nil {
|
||||||
|
assert.Subset(t, metrics.AllMetrics(), tt.expectMetrics)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,8 +113,9 @@ func TestAttestedNodesCacheMissedEventNotFound(t *testing.T) {
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
cache := authorizedentries.NewCache(clk)
|
cache := authorizedentries.NewCache(clk)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, defaultSQLTransactionTimeout)
|
attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, attestedNodes)
|
require.NotNil(t, attestedNodes)
|
||||||
|
|
||||||
|
@ -105,6 +131,7 @@ func TestAttestedNodesSavesMissedStartupEvents(t *testing.T) {
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
cache := authorizedentries.NewCache(clk)
|
cache := authorizedentries.NewCache(clk)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
err := ds.CreateAttestedNodeEventForTesting(ctx, &datastore.AttestedNodeEvent{
|
err := ds.CreateAttestedNodeEventForTesting(ctx, &datastore.AttestedNodeEvent{
|
||||||
EventID: 3,
|
EventID: 3,
|
||||||
|
@ -112,7 +139,7 @@ func TestAttestedNodesSavesMissedStartupEvents(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
attestedNodes, err := buildAttestedNodesCache(ctx, log, ds, clk, cache, defaultSQLTransactionTimeout)
|
attestedNodes, err := buildAttestedNodesCache(ctx, log, metrics, ds, clk, cache, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, attestedNodes)
|
require.NotNil(t, attestedNodes)
|
||||||
require.Equal(t, uint(3), attestedNodes.firstEventID)
|
require.Equal(t, uint(3), attestedNodes.firstEventID)
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"github.com/andres-erbsen/clock"
|
"github.com/andres-erbsen/clock"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"github.com/spiffe/spire/pkg/common/telemetry"
|
"github.com/spiffe/spire/pkg/common/telemetry"
|
||||||
|
server_telemetry "github.com/spiffe/spire/pkg/common/telemetry/server"
|
||||||
"github.com/spiffe/spire/pkg/server/api"
|
"github.com/spiffe/spire/pkg/server/api"
|
||||||
"github.com/spiffe/spire/pkg/server/authorizedentries"
|
"github.com/spiffe/spire/pkg/server/authorizedentries"
|
||||||
"github.com/spiffe/spire/pkg/server/datastore"
|
"github.com/spiffe/spire/pkg/server/datastore"
|
||||||
|
@ -17,11 +18,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type registrationEntries struct {
|
type registrationEntries struct {
|
||||||
cache *authorizedentries.Cache
|
cache *authorizedentries.Cache
|
||||||
clk clock.Clock
|
clk clock.Clock
|
||||||
ds datastore.DataStore
|
ds datastore.DataStore
|
||||||
log logrus.FieldLogger
|
log logrus.FieldLogger
|
||||||
mu sync.RWMutex
|
metrics telemetry.Metrics
|
||||||
|
mu sync.RWMutex
|
||||||
|
|
||||||
firstEventID uint
|
firstEventID uint
|
||||||
firstEventTime time.Time
|
firstEventTime time.Time
|
||||||
|
@ -31,9 +33,8 @@ type registrationEntries struct {
|
||||||
sqlTransactionTimeout time.Duration
|
sqlTransactionTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildRegistrationEntriesCache fetches all registration entries and adds them to the cache.
|
// buildRegistrationEntriesCache Fetches all registration entries and adds them to the cache
|
||||||
// It runs once at startup.
|
func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, metrics telemetry.Metrics, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, pageSize int32, sqlTransactionTimeout time.Duration) (*registrationEntries, error) {
|
||||||
func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger, ds datastore.DataStore, clk clock.Clock, cache *authorizedentries.Cache, pageSize int32, sqlTransactionTimeout time.Duration) (*registrationEntries, error) {
|
|
||||||
resp, err := ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{})
|
resp, err := ds.ListRegistrationEntriesEvents(ctx, &datastore.ListRegistrationEntriesEventsRequest{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -95,6 +96,7 @@ func buildRegistrationEntriesCache(ctx context.Context, log logrus.FieldLogger,
|
||||||
firstEventID: firstEventID,
|
firstEventID: firstEventID,
|
||||||
firstEventTime: firstEventTime,
|
firstEventTime: firstEventTime,
|
||||||
log: log,
|
log: log,
|
||||||
|
metrics: metrics,
|
||||||
lastEventID: lastEventID,
|
lastEventID: lastEventID,
|
||||||
missedEvents: missedEvents,
|
missedEvents: missedEvents,
|
||||||
seenMissedStartupEvents: make(map[uint]struct{}),
|
seenMissedStartupEvents: make(map[uint]struct{}),
|
||||||
|
@ -155,6 +157,14 @@ func (a *registrationEntries) updateCache(ctx context.Context) error {
|
||||||
a.lastEventID = event.EventID
|
a.lastEventID = event.EventID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// These two should be the same value but it's valuable to have them both be emitted for incident triage.
|
||||||
|
server_telemetry.SetNodeAliasesByEntryIDCacheCountGauge(a.metrics, a.cache.Stats().AliasesByEntryID)
|
||||||
|
server_telemetry.SetNodeAliasesBySelectorCacheCountGauge(a.metrics, a.cache.Stats().AliasesBySelector)
|
||||||
|
|
||||||
|
// These two should be the same value but it's valuable to have them both be emitted for incident triage.
|
||||||
|
server_telemetry.SetEntriesByEntryIDCacheCountGauge(a.metrics, a.cache.Stats().EntriesByEntryID)
|
||||||
|
server_telemetry.SetEntriesByParentIDCacheCountGauge(a.metrics, a.cache.Stats().EntriesByParentID)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,6 +224,7 @@ func (a *registrationEntries) replayMissedEvents(ctx context.Context) {
|
||||||
|
|
||||||
delete(a.missedEvents, eventID)
|
delete(a.missedEvents, eventID)
|
||||||
}
|
}
|
||||||
|
server_telemetry.SetSkippedEntryEventIDsCacheCountGauge(a.metrics, len(a.missedEvents))
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateCacheEntry update/deletes/creates an individual registration entry in the cache.
|
// updateCacheEntry update/deletes/creates an individual registration entry in the cache.
|
||||||
|
|
|
@ -16,6 +16,7 @@ import (
|
||||||
"github.com/spiffe/spire/proto/spire/common"
|
"github.com/spiffe/spire/proto/spire/common"
|
||||||
"github.com/spiffe/spire/test/clock"
|
"github.com/spiffe/spire/test/clock"
|
||||||
"github.com/spiffe/spire/test/fakes/fakedatastore"
|
"github.com/spiffe/spire/test/fakes/fakedatastore"
|
||||||
|
"github.com/spiffe/spire/test/fakes/fakemetrics"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -70,7 +71,9 @@ func TestBuildRegistrationEntriesCache(t *testing.T) {
|
||||||
tt := tt
|
tt := tt
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
cache := authorizedentries.NewCache(clk)
|
cache := authorizedentries.NewCache(clk)
|
||||||
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, tt.pageSize, defaultSQLTransactionTimeout)
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
|
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, tt.pageSize, defaultSQLTransactionTimeout)
|
||||||
if tt.err != "" {
|
if tt.err != "" {
|
||||||
require.ErrorContains(t, err, tt.err)
|
require.ErrorContains(t, err, tt.err)
|
||||||
return
|
return
|
||||||
|
@ -104,8 +107,9 @@ func TestRegistrationEntriesCacheMissedEventNotFound(t *testing.T) {
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
cache := authorizedentries.NewCache(clk)
|
cache := authorizedentries.NewCache(clk)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, defaultSQLTransactionTimeout)
|
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, buildCachePageSize, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, registrationEntries)
|
require.NotNil(t, registrationEntries)
|
||||||
|
|
||||||
|
@ -121,6 +125,7 @@ func TestRegistrationEntriesSavesMissedStartupEvents(t *testing.T) {
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
cache := authorizedentries.NewCache(clk)
|
cache := authorizedentries.NewCache(clk)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
err := ds.CreateRegistrationEntryEventForTesting(ctx, &datastore.RegistrationEntryEvent{
|
err := ds.CreateRegistrationEntryEventForTesting(ctx, &datastore.RegistrationEntryEvent{
|
||||||
EventID: 3,
|
EventID: 3,
|
||||||
|
@ -128,7 +133,7 @@ func TestRegistrationEntriesSavesMissedStartupEvents(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, ds, clk, cache, buildCachePageSize, defaultSQLTransactionTimeout)
|
registrationEntries, err := buildRegistrationEntriesCache(ctx, log, metrics, ds, clk, cache, buildCachePageSize, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, registrationEntries)
|
require.NotNil(t, registrationEntries)
|
||||||
require.Equal(t, uint(3), registrationEntries.firstEventID)
|
require.Equal(t, uint(3), registrationEntries.firstEventID)
|
||||||
|
|
|
@ -10,10 +10,12 @@ import (
|
||||||
"github.com/sirupsen/logrus/hooks/test"
|
"github.com/sirupsen/logrus/hooks/test"
|
||||||
"github.com/spiffe/go-spiffe/v2/spiffeid"
|
"github.com/spiffe/go-spiffe/v2/spiffeid"
|
||||||
"github.com/spiffe/spire/pkg/common/idutil"
|
"github.com/spiffe/spire/pkg/common/idutil"
|
||||||
|
"github.com/spiffe/spire/pkg/common/telemetry"
|
||||||
"github.com/spiffe/spire/pkg/server/datastore"
|
"github.com/spiffe/spire/pkg/server/datastore"
|
||||||
"github.com/spiffe/spire/proto/spire/common"
|
"github.com/spiffe/spire/proto/spire/common"
|
||||||
"github.com/spiffe/spire/test/clock"
|
"github.com/spiffe/spire/test/clock"
|
||||||
"github.com/spiffe/spire/test/fakes/fakedatastore"
|
"github.com/spiffe/spire/test/fakes/fakedatastore"
|
||||||
|
"github.com/spiffe/spire/test/fakes/fakemetrics"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
@ -23,8 +25,9 @@ func TestNewAuthorizedEntryFetcherWithEventsBasedCache(t *testing.T) {
|
||||||
log, _ := test.NewNullLogger()
|
log, _ := test.NewNullLogger()
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, ef)
|
assert.NotNil(t, ef)
|
||||||
|
|
||||||
|
@ -94,6 +97,21 @@ func TestNewAuthorizedEntryFetcherWithEventsBasedCache(t *testing.T) {
|
||||||
entries, err := ef.FetchAuthorizedEntries(ctx, agentID)
|
entries, err := ef.FetchAuthorizedEntries(ctx, agentID)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, 2, len(entries))
|
assert.Equal(t, 2, len(entries))
|
||||||
|
|
||||||
|
// Assert metrics
|
||||||
|
expectedMetrics := []fakemetrics.MetricItem{
|
||||||
|
agentsByIDMetric(1),
|
||||||
|
agentsByIDExpiresAtMetric(1),
|
||||||
|
nodeAliasesByEntryIDMetric(1),
|
||||||
|
nodeAliasesBySelectorMetric(1),
|
||||||
|
entriesByEntryIDMetric(2),
|
||||||
|
entriesByParentIDMetric(2),
|
||||||
|
// Here we have 2 skipped events, one for nodes, one for entries
|
||||||
|
nodeSkippedEventMetric(0),
|
||||||
|
entriesSkippedEventMetric(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.ElementsMatch(t, expectedMetrics, metrics.AllMetrics(), "should emit metrics for node aliases, entries, and agents")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewAuthorizedEntryFetcherWithEventsBasedCacheErrorBuildingCache(t *testing.T) {
|
func TestNewAuthorizedEntryFetcherWithEventsBasedCacheErrorBuildingCache(t *testing.T) {
|
||||||
|
@ -101,13 +119,18 @@ func TestNewAuthorizedEntryFetcherWithEventsBasedCacheErrorBuildingCache(t *test
|
||||||
log, _ := test.NewNullLogger()
|
log, _ := test.NewNullLogger()
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
buildErr := errors.New("build error")
|
buildErr := errors.New("build error")
|
||||||
ds.SetNextError(buildErr)
|
ds.SetNextError(buildErr)
|
||||||
|
|
||||||
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.Nil(t, ef)
|
assert.Nil(t, ef)
|
||||||
|
|
||||||
|
// Assert metrics
|
||||||
|
expectedMetrics := []fakemetrics.MetricItem{}
|
||||||
|
assert.ElementsMatch(t, expectedMetrics, metrics.AllMetrics(), "should emit no metrics")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuildCacheSavesMissedEvents(t *testing.T) {
|
func TestBuildCacheSavesMissedEvents(t *testing.T) {
|
||||||
|
@ -115,6 +138,7 @@ func TestBuildCacheSavesMissedEvents(t *testing.T) {
|
||||||
log, _ := test.NewNullLogger()
|
log, _ := test.NewNullLogger()
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
// Create Registration Entry Events with a gap
|
// Create Registration Entry Events with a gap
|
||||||
err := ds.CreateRegistrationEntryEventForTesting(ctx, &datastore.RegistrationEntryEvent{
|
err := ds.CreateRegistrationEntryEventForTesting(ctx, &datastore.RegistrationEntryEvent{
|
||||||
|
@ -142,7 +166,7 @@ func TestBuildCacheSavesMissedEvents(t *testing.T) {
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, registrationEntries, attestedNodes, err := buildCache(ctx, log, ds, clk, defaultSQLTransactionTimeout)
|
_, registrationEntries, attestedNodes, err := buildCache(ctx, log, metrics, ds, clk, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, registrationEntries)
|
require.NotNil(t, registrationEntries)
|
||||||
require.NotNil(t, attestedNodes)
|
require.NotNil(t, attestedNodes)
|
||||||
|
@ -153,6 +177,10 @@ func TestBuildCacheSavesMissedEvents(t *testing.T) {
|
||||||
assert.Contains(t, attestedNodes.missedEvents, uint(2))
|
assert.Contains(t, attestedNodes.missedEvents, uint(2))
|
||||||
assert.Contains(t, attestedNodes.missedEvents, uint(3))
|
assert.Contains(t, attestedNodes.missedEvents, uint(3))
|
||||||
assert.Equal(t, uint(4), attestedNodes.lastEventID)
|
assert.Equal(t, uint(4), attestedNodes.lastEventID)
|
||||||
|
|
||||||
|
// Assert metrics since the updateCache() method doesn't get called right at built time.
|
||||||
|
expectedMetrics := []fakemetrics.MetricItem{}
|
||||||
|
assert.ElementsMatch(t, expectedMetrics, metrics.AllMetrics(), "should emit no metrics")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRunUpdateCacheTaskPrunesExpiredAgents(t *testing.T) {
|
func TestRunUpdateCacheTaskPrunesExpiredAgents(t *testing.T) {
|
||||||
|
@ -161,8 +189,9 @@ func TestRunUpdateCacheTaskPrunesExpiredAgents(t *testing.T) {
|
||||||
log.SetLevel(logrus.DebugLevel)
|
log.SetLevel(logrus.DebugLevel)
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, ef)
|
require.NotNil(t, ef)
|
||||||
|
|
||||||
|
@ -226,8 +255,9 @@ func TestUpdateRegistrationEntriesCacheMissedEvents(t *testing.T) {
|
||||||
log, _ := test.NewNullLogger()
|
log, _ := test.NewNullLogger()
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, ef)
|
require.NotNil(t, ef)
|
||||||
|
|
||||||
|
@ -309,6 +339,7 @@ func TestUpdateRegistrationEntriesCacheMissedStartupEvents(t *testing.T) {
|
||||||
log, _ := test.NewNullLogger()
|
log, _ := test.NewNullLogger()
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
agentID := spiffeid.RequireFromString("spiffe://example.org/myagent")
|
agentID := spiffeid.RequireFromString("spiffe://example.org/myagent")
|
||||||
|
|
||||||
|
@ -345,7 +376,7 @@ func TestUpdateRegistrationEntriesCacheMissedStartupEvents(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Create entry fetcher
|
// Create entry fetcher
|
||||||
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, ef)
|
require.NotNil(t, ef)
|
||||||
|
|
||||||
|
@ -415,8 +446,9 @@ func TestUpdateAttestedNodesCacheMissedEvents(t *testing.T) {
|
||||||
log, _ := test.NewNullLogger()
|
log, _ := test.NewNullLogger()
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, ef)
|
require.NotNil(t, ef)
|
||||||
|
|
||||||
|
@ -531,6 +563,7 @@ func TestUpdateAttestedNodesCacheMissedStartupEvents(t *testing.T) {
|
||||||
log, _ := test.NewNullLogger()
|
log, _ := test.NewNullLogger()
|
||||||
clk := clock.NewMock(t)
|
clk := clock.NewMock(t)
|
||||||
ds := fakedatastore.New(t)
|
ds := fakedatastore.New(t)
|
||||||
|
metrics := fakemetrics.New()
|
||||||
|
|
||||||
agent1 := spiffeid.RequireFromString("spiffe://example.org/myagent1")
|
agent1 := spiffeid.RequireFromString("spiffe://example.org/myagent1")
|
||||||
agent2 := spiffeid.RequireFromString("spiffe://example.org/myagent2")
|
agent2 := spiffeid.RequireFromString("spiffe://example.org/myagent2")
|
||||||
|
@ -594,7 +627,7 @@ func TestUpdateAttestedNodesCacheMissedStartupEvents(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Create entry fetcher
|
// Create entry fetcher
|
||||||
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
ef, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, log, metrics, clk, ds, defaultCacheReloadInterval, defaultPruneEventsOlderThan, defaultSQLTransactionTimeout)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, ef)
|
require.NotNil(t, ef)
|
||||||
|
|
||||||
|
@ -652,3 +685,75 @@ func TestUpdateAttestedNodesCacheMissedStartupEvents(t *testing.T) {
|
||||||
require.Equal(t, entry.EntryId, entries[0].Id)
|
require.Equal(t, entry.EntryId, entries[0].Id)
|
||||||
require.Equal(t, entry.SpiffeId, idutil.RequireIDProtoString(entries[0].SpiffeId))
|
require.Equal(t, entry.SpiffeId, idutil.RequireIDProtoString(entries[0].SpiffeId))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AgentsByIDCacheCount
|
||||||
|
func agentsByIDMetric(val float32) fakemetrics.MetricItem {
|
||||||
|
return fakemetrics.MetricItem{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Node, telemetry.AgentsByIDCache, telemetry.Count},
|
||||||
|
Val: val,
|
||||||
|
Labels: nil}
|
||||||
|
}
|
||||||
|
|
||||||
|
func agentsByIDExpiresAtMetric(val float32) fakemetrics.MetricItem {
|
||||||
|
return fakemetrics.MetricItem{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Node, telemetry.AgentsByExpiresAtCache, telemetry.Count},
|
||||||
|
Val: val,
|
||||||
|
Labels: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func nodeAliasesByEntryIDMetric(val float32) fakemetrics.MetricItem {
|
||||||
|
return fakemetrics.MetricItem{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Entry, telemetry.NodeAliasesByEntryIDCache, telemetry.Count},
|
||||||
|
Val: val,
|
||||||
|
Labels: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func nodeSkippedEventMetric(val float32) fakemetrics.MetricItem {
|
||||||
|
return fakemetrics.MetricItem{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Node, telemetry.SkippedNodeEventIDs, telemetry.Count},
|
||||||
|
Val: val,
|
||||||
|
Labels: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func nodeAliasesBySelectorMetric(val float32) fakemetrics.MetricItem {
|
||||||
|
return fakemetrics.MetricItem{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Entry, telemetry.NodeAliasesBySelectorCache, telemetry.Count},
|
||||||
|
Val: val,
|
||||||
|
Labels: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func entriesByEntryIDMetric(val float32) fakemetrics.MetricItem {
|
||||||
|
return fakemetrics.MetricItem{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Entry, telemetry.EntriesByEntryIDCache, telemetry.Count},
|
||||||
|
Val: val,
|
||||||
|
Labels: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func entriesByParentIDMetric(val float32) fakemetrics.MetricItem {
|
||||||
|
return fakemetrics.MetricItem{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Entry, telemetry.EntriesByParentIDCache, telemetry.Count},
|
||||||
|
Val: val,
|
||||||
|
Labels: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func entriesSkippedEventMetric(val float32) fakemetrics.MetricItem {
|
||||||
|
return fakemetrics.MetricItem{
|
||||||
|
Type: fakemetrics.SetGaugeType,
|
||||||
|
Key: []string{telemetry.Entry, telemetry.SkippedEntryEventIDs, telemetry.Count},
|
||||||
|
Val: val,
|
||||||
|
Labels: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -134,7 +134,7 @@ func New(ctx context.Context, c Config) (*Endpoints, error) {
|
||||||
var ef api.AuthorizedEntryFetcher
|
var ef api.AuthorizedEntryFetcher
|
||||||
var cacheRebuildTask, pruneEventsTask func(context.Context) error
|
var cacheRebuildTask, pruneEventsTask func(context.Context) error
|
||||||
if c.EventsBasedCache {
|
if c.EventsBasedCache {
|
||||||
efEventsBasedCache, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, c.Log, c.Clock, ds, c.CacheReloadInterval, c.PruneEventsOlderThan, c.SQLTransactionTimeout)
|
efEventsBasedCache, err := NewAuthorizedEntryFetcherWithEventsBasedCache(ctx, c.Log, c.Metrics, c.Clock, ds, c.CacheReloadInterval, c.PruneEventsOlderThan, c.SQLTransactionTimeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue