policy-controller: Use `AHashMap` for indices (#7616)

While looking into replacing `dashmap` in `kube-rs`
(kube-rs/kube-rs#785), I realized that we're probably better off using
[`ahash`][ahash] in our indexing code. AHash pruports to the be "the
fastest DoS-resistant hash currently available in Rust." According to
<https://lib.rs/crates/ahash>, it has substantial usage.

This change should help to minimize lock contention (i.e. by speeding up
hashing while a lock is held). In general, this looks to be a superior
default to `std::collections::HashMap`.

Signed-off-by: Oliver Gould <ver@buoyant.io>

[ahash]: e77cab8c1e/compare/readme.md
This commit is contained in:
Oliver Gould 2022-01-17 09:08:43 -08:00 committed by GitHub
parent f2ad5a9b2a
commit d2a0fa08d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 36 additions and 20 deletions

View File

@ -2,6 +2,17 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.18"
@ -870,6 +881,7 @@ dependencies = [
name = "linkerd-policy-controller-core"
version = "0.1.0"
dependencies = [
"ahash",
"anyhow",
"async-trait",
"futures",
@ -910,6 +922,7 @@ dependencies = [
name = "linkerd-policy-controller-k8s-index"
version = "0.1.0"
dependencies = [
"ahash",
"anyhow",
"async-stream",
"async-trait",

View File

@ -6,6 +6,7 @@ license = "Apache-2.0"
publish = false
[dependencies]
ahash = "0.7"
anyhow = "1"
async-trait = "0.1"
futures = { version = "0.3", default-features = false, features = ["std"] }

View File

@ -5,10 +5,11 @@ mod identity_match;
mod network_match;
pub use self::{identity_match::IdentityMatch, network_match::NetworkMatch};
use ahash::AHashMap as HashMap;
use anyhow::Result;
use futures::prelude::*;
pub use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use std::{collections::HashMap, hash::Hash, pin::Pin, time::Duration};
use std::{hash::Hash, pin::Pin, time::Duration};
/// Models inbound server configuration discovery.
#[async_trait::async_trait]

View File

@ -6,6 +6,7 @@ license = "Apache-2.0"
publish = false
[dependencies]
ahash = "0.7"
anyhow = "1"
async-stream = "0.3"
async-trait = "0.1"

View File

@ -1,4 +1,5 @@
use crate::{server::ServerSelector, ClusterInfo, Errors, Index, SrvIndex};
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use anyhow::{anyhow, bail, Result};
use linkerd_policy_controller_core::{
ClientAuthentication, ClientAuthorization, IdentityMatch, IpNet, NetworkMatch,
@ -8,7 +9,7 @@ use linkerd_policy_controller_k8s_api::{
policy::{self, authz::MeshTls},
ResourceExt,
};
use std::collections::{hash_map::Entry as HashEntry, HashMap, HashSet};
use std::collections::hash_map::Entry;
use tracing::{debug, instrument, trace};
/// Indexes `ServerAuthorization` resources within a namespace.
@ -142,12 +143,12 @@ impl AuthzIndex {
let authz = mk_authz(authz, cluster)?;
match self.index.entry(name) {
HashEntry::Vacant(entry) => {
Entry::Vacant(entry) => {
servers.add_authz(entry.key(), &authz.servers, authz.clients.clone());
entry.insert(authz);
}
HashEntry::Occupied(mut entry) => {
Entry::Occupied(mut entry) => {
// If the authorization changed materially, then update it in all servers.
if entry.get() != &authz {
servers.add_authz(entry.key(), &authz.servers, authz.clients.clone());

View File

@ -1,11 +1,12 @@
use crate::{ServerRx, ServerTx};
use ahash::AHashMap as HashMap;
use anyhow::{anyhow, Error, Result};
use linkerd_policy_controller_core::{
ClientAuthentication, ClientAuthorization, IdentityMatch, InboundServer, IpNet, NetworkMatch,
ProxyProtocol,
};
use linkerd_policy_controller_k8s_api as k8s;
use std::{collections::HashMap, hash::Hash};
use std::hash::Hash;
use tokio::{sync::watch, time};
/// Indicates the default behavior to apply when no Server is found for a port.

View File

@ -1,11 +1,9 @@
use crate::PodServerRx;
use ahash::AHashMap as HashMap;
use anyhow::{anyhow, Result};
use linkerd_policy_controller_core::{DiscoverInboundServer, InboundServer, InboundServerStream};
use parking_lot::RwLock;
use std::{
collections::hash_map::{Entry, HashMap},
sync::Arc,
};
use std::{collections::hash_map::Entry, sync::Arc};
#[derive(Debug, Default)]
pub(crate) struct Writer(ByNs);

View File

@ -1,5 +1,5 @@
use crate::{authz::AuthzIndex, pod::PodIndex, server::SrvIndex, DefaultPolicy};
use std::collections::HashMap;
use ahash::AHashMap as HashMap;
#[derive(Debug)]
pub(crate) struct NamespaceIndex {

View File

@ -2,9 +2,10 @@ use crate::{
defaults::PortDefaults, lookup, DefaultPolicy, DefaultPolicyWatches, Errors, Index, Namespace,
PodServerTx, ServerRx, SrvIndex,
};
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use anyhow::{anyhow, bail, Context, Result};
use linkerd_policy_controller_k8s_api::{self as k8s, policy, ResourceExt};
use std::collections::{hash_map::Entry as HashEntry, HashMap, HashSet};
use std::collections::hash_map::Entry;
use tokio::sync::watch;
use tracing::{debug, instrument, trace};
@ -193,7 +194,7 @@ impl PodIndex {
let ns_name = pod.namespace().expect("pod must have a namespace");
let pod_name = pod.name();
match self.index.entry(pod_name) {
HashEntry::Vacant(pod_entry) => {
Entry::Vacant(pod_entry) => {
let spec = pod.spec.ok_or_else(|| anyhow!("pod missing spec"))?;
// Check the pod for a default-allow annotation. If it's set, use it; otherwise use
@ -239,7 +240,7 @@ impl PodIndex {
Ok(())
}
HashEntry::Occupied(mut entry) => {
Entry::Occupied(mut entry) => {
debug_assert!(
lookups.contains(&ns_name, entry.key()),
"pod must exist in lookups"

View File

@ -1,11 +1,9 @@
use crate::{authz::AuthzIndex, Errors, Index, Namespace, ServerRx, ServerTx};
use ahash::{AHashMap as HashMap, AHashSet as HashSet};
use anyhow::{anyhow, bail, Result};
use linkerd_policy_controller_core::{ClientAuthorization, InboundServer, ProxyProtocol};
use linkerd_policy_controller_k8s_api::{self as k8s, policy, ResourceExt};
use std::{
collections::{hash_map::Entry as HashEntry, HashMap, HashSet},
sync::Arc,
};
use std::{collections::hash_map::Entry, sync::Arc};
use tokio::{sync::watch, time};
use tracing::{debug, instrument, trace};
@ -184,7 +182,7 @@ impl SrvIndex {
let protocol = Self::mk_protocol(srv.spec.proxy_protocol.as_ref());
match self.index.entry(srv_name) {
HashEntry::Vacant(entry) => {
Entry::Vacant(entry) => {
let labels = k8s::Labels::from(srv.metadata.labels);
let authzs = ns_authzs
.filter_for_server(entry.key(), labels.clone())
@ -209,7 +207,7 @@ impl SrvIndex {
});
}
HashEntry::Occupied(mut entry) => {
Entry::Occupied(mut entry) => {
trace!(srv = ?entry.get(), "Updating existing server");
// If something about the server changed, we need to update the config to reflect

View File

@ -1,11 +1,12 @@
use super::*;
use ahash::AHashMap as HashMap;
use futures::prelude::*;
use linkerd_policy_controller_core::{
ClientAuthentication, ClientAuthorization, IdentityMatch, IpNet, Ipv4Net, Ipv6Net,
NetworkMatch, ProxyProtocol,
};
use linkerd_policy_controller_k8s_api::{policy::server::Port, ResourceExt};
use std::{collections::HashMap, net::IpAddr, str::FromStr};
use std::{net::IpAddr, str::FromStr};
use tokio::time;
/// Creates a pod, then a server, then an authorization--then deletes these resources in the reverse