From a31f85d4493c01201919e2591aa6b04641cae297 Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Sun, 28 Jun 2026 14:58:24 -0400 Subject: [PATCH 1/5] Move shard allocator to server common --- Cargo.lock | 4 +- core/configs/src/server_config/sharding.rs | 240 +---------------- core/configs/src/server_config/validators.rs | 2 +- core/server-ng/src/bootstrap.rs | 2 +- core/server-ng/src/server_error.rs | 2 +- core/server/Cargo.toml | 6 - core/server/src/lib.rs | 2 +- core/server_common/Cargo.toml | 8 + core/server_common/src/lib.rs | 1 + .../src/shard_allocator.rs | 2 +- .../src/sharding/cpu_allocation.rs | 251 ++++++++++++++++++ core/server_common/src/sharding/mod.rs | 2 + 12 files changed, 276 insertions(+), 246 deletions(-) rename core/{server => server_common}/src/shard_allocator.rs (99%) create mode 100644 core/server_common/src/sharding/cpu_allocation.rs diff --git a/Cargo.lock b/Cargo.lock index be96205a9e..554f29c12e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11516,7 +11516,6 @@ dependencies = [ "futures", "hash32 1.0.0", "human-repr", - "hwlocality", "iggy_binary_protocol", "iggy_common", "jsonwebtoken", @@ -11654,6 +11653,7 @@ dependencies = [ "crossbeam", "err_trail", "human-repr", + "hwlocality", "iggy_binary_protocol", "iggy_common", "lending-iterator", @@ -11661,6 +11661,8 @@ dependencies = [ "nix", "rcgen", "rustls", + "serde", + "serde_json", "serial_test", "smallvec", "thiserror 2.0.18", diff --git a/core/configs/src/server_config/sharding.rs b/core/configs/src/server_config/sharding.rs index 7434f9167a..cc1ea6c6a0 100644 --- a/core/configs/src/server_config/sharding.rs +++ b/core/configs/src/server_config/sharding.rs @@ -16,13 +16,17 @@ // under the License. use iggy_common::IggyDuration; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; use serde_with::{DisplayFromStr, serde_as}; -use std::str::FromStr; use std::time::Duration; use configs::ConfigEnv; +// `CpuAllocation`/`NumaConfig` live in `server_common` next to the allocator +// that consumes them. Re-exported here to keep the `configs::sharding::*` path +// stable and to avoid a `server_common -> configs -> server_common` cycle. +pub use server_common::sharding::{CpuAllocation, NumaConfig}; + /// Default capacity of the per-shard inter-shard inbox channel. Sized /// comfortably above the consensus working set, which is roughly /// `PIPELINE_PREPARE_QUEUE_MAX (= 8) * replica_count * directions` @@ -178,235 +182,3 @@ impl Default for ShardingConfig { } } } - -#[derive(Debug, Clone, PartialEq, Default)] -pub enum CpuAllocation { - #[default] - All, - Count(usize), - Range(usize, usize), - NumaAware(NumaConfig), -} - -/// NUMA specific configuration -#[derive(Debug, Clone, PartialEq, Default)] -pub struct NumaConfig { - /// Which NUMA nodes to use (empty = auto-detect all) - pub nodes: Vec, - /// Cores per node to use (0 = use all available) - pub cores_per_node: usize, - /// skip hyperthread sibling - pub avoid_hyperthread: bool, -} - -impl CpuAllocation { - fn parse_numa(s: &str) -> Result { - let params = s - .strip_prefix("numa:") - .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?; - - if params == "auto" { - return Ok(CpuAllocation::NumaAware(NumaConfig { - nodes: vec![], - cores_per_node: 0, - avoid_hyperthread: true, - })); - } - - let mut nodes = Vec::new(); - let mut cores_per_node = 0; - let mut avoid_hyperthread = true; - - for param in params.split(';') { - let kv: Vec<&str> = param.split('=').collect(); - if kv.len() != 2 { - return Err(format!( - "Invalid NUMA parameter: '{param}', only available: 'auto'" - )); - } - - match kv[0] { - "nodes" => { - nodes = kv[1] - .split(',') - .map(|n| { - n.parse::() - .map_err(|_| format!("Invalid node number: {n}")) - }) - .collect::, _>>()?; - } - "cores" => { - cores_per_node = kv[1] - .parse::() - .map_err(|_| format!("Invalid cores value: {}", kv[1]))?; - } - "no_ht" => { - avoid_hyperthread = kv[1] - .parse::() - .map_err(|_| format!("Invalid no ht value: {}", kv[1]))?; - } - _ => { - return Err(format!( - "Unknown NUMA parameter: {}, example: numa:nodes=0;cores=4;no_ht=true", - kv[0] - )); - } - } - } - - Ok(CpuAllocation::NumaAware(NumaConfig { - nodes, - cores_per_node, - avoid_hyperthread, - })) - } -} - -impl FromStr for CpuAllocation { - type Err = String; - - fn from_str(s: &str) -> Result { - match s { - "all" => Ok(CpuAllocation::All), - s if s.starts_with("numa:") => Self::parse_numa(s), - s if s.contains("..") => { - let parts: Vec<&str> = s.split("..").collect(); - if parts.len() != 2 { - return Err(format!("Invalid range format: {s}. Expected 'start..end'")); - } - let start = parts[0] - .parse::() - .map_err(|_| format!("Invalid start value: {}", parts[0]))?; - let end = parts[1] - .parse::() - .map_err(|_| format!("Invalid end value: {}", parts[1]))?; - Ok(CpuAllocation::Range(start, end)) - } - s => { - let count = s - .parse::() - .map_err(|_| format!("Invalid shard count: {s}"))?; - Ok(CpuAllocation::Count(count)) - } - } - } -} - -impl Serialize for CpuAllocation { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - match self { - CpuAllocation::All => serializer.serialize_str("all"), - CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64), - CpuAllocation::Range(start, end) => { - serializer.serialize_str(&format!("{start}..{end}")) - } - CpuAllocation::NumaAware(numa) => { - if numa.nodes.is_empty() && numa.cores_per_node == 0 { - serializer.serialize_str("numa:auto") - } else { - let nodes_str = numa - .nodes - .iter() - .map(|n| n.to_string()) - .collect::>() - .join(","); - - let full_str = format!( - "numa:nodes={};cores={};no_ht={}", - nodes_str, numa.cores_per_node, numa.avoid_hyperthread - ); - - serializer.serialize_str(&full_str) - } - } - } - } -} - -impl<'de> Deserialize<'de> for CpuAllocation { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - #[derive(Deserialize)] - #[serde(untagged)] - enum CpuAllocationHelper { - String(String), - Number(usize), - } - - match CpuAllocationHelper::deserialize(deserializer)? { - CpuAllocationHelper::String(s) => { - CpuAllocation::from_str(&s).map_err(serde::de::Error::custom) - } - CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_parse_all() { - assert_eq!(CpuAllocation::from_str("all").unwrap(), CpuAllocation::All); - } - - #[test] - fn test_parse_count() { - assert_eq!( - CpuAllocation::from_str("4").unwrap(), - CpuAllocation::Count(4) - ); - } - - #[test] - fn test_parse_range() { - assert_eq!( - CpuAllocation::from_str("2..8").unwrap(), - CpuAllocation::Range(2, 8) - ); - } - - #[test] - fn test_parse_numa_auto() { - let result = CpuAllocation::from_str("numa:auto").unwrap(); - match result { - CpuAllocation::NumaAware(numa) => { - assert!(numa.nodes.is_empty()); - assert_eq!(numa.cores_per_node, 0); - assert!(numa.avoid_hyperthread); - } - _ => panic!("Expected NumaAware"), - } - } - - #[test] - fn test_parse_numa_explicit() { - let result = CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap(); - match result { - CpuAllocation::NumaAware(numa) => { - assert_eq!(numa.nodes, vec![0, 1]); - assert_eq!(numa.cores_per_node, 4); - assert!(numa.avoid_hyperthread); - } - _ => panic!("Expected NumaAware"), - } - } - - #[test] - fn test_numa_explicit_serde_roundtrip() { - let original = CpuAllocation::NumaAware(NumaConfig { - nodes: vec![0, 1], - cores_per_node: 4, - avoid_hyperthread: true, - }); - let serialized = serde_json::to_string(&original).unwrap(); - let deserialized: CpuAllocation = serde_json::from_str(&serialized).unwrap(); - assert_eq!(original, deserialized); - } -} diff --git a/core/configs/src/server_config/validators.rs b/core/configs/src/server_config/validators.rs index 93c61fb9b5..9b8337ee84 100644 --- a/core/configs/src/server_config/validators.rs +++ b/core/configs/src/server_config/validators.rs @@ -497,7 +497,7 @@ impl Validatable for ShardingConfig { Ok(()) } // NUMA topology validation requires hwlocality (runtime dep). - // Full NUMA validation happens in server::shard_allocator at startup. + // Full NUMA validation happens in server_common::shard_allocator at startup. CpuAllocation::NumaAware(_) => Ok(()), } } diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index 1e93ce8041..fd97f37b02 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -70,10 +70,10 @@ use partitions::{ use rustls::pki_types::ServerName; use server_common::bootstrap::create_directories; use server_common::executor::create_shard_executor; +use server_common::shard_allocator::{ShardAllocator, ShardInfo}; use server_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; // TODO: decouple bootstrap/storage helpers and logging from the `server` crate. use server::log::logger::Logging; -use server::shard_allocator::{ShardAllocator, ShardInfo}; use server::streaming::users::user::User as LegacyUser; use server::{IGGY_ROOT_PASSWORD_ENV, IGGY_ROOT_USERNAME_ENV}; use shard::builder::IggyShardBuilder; diff --git a/core/server-ng/src/server_error.rs b/core/server-ng/src/server_error.rs index 35e93090e7..6b78e74dcb 100644 --- a/core/server-ng/src/server_error.rs +++ b/core/server-ng/src/server_error.rs @@ -18,7 +18,7 @@ use metadata::impls::recovery::RecoveryError; // TODO: decouple logging errors from the `server` crate. use server::server_error::LogError; -use server::shard_allocator::ShardingError; +use server_common::shard_allocator::ShardingError; use shard::ShardCtorError; use thiserror::Error; diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 647f063759..927ca0a458 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -107,11 +107,5 @@ tracing-subscriber = { workspace = true } ulid = { workspace = true } uuid = { workspace = true } -[target.'cfg(not(target_env = "musl"))'.dependencies] -hwlocality = { workspace = true } - -[target.'cfg(target_env = "musl")'.dependencies] -hwlocality = { workspace = true, features = ["vendored"] } - [build-dependencies] vergen-git2 = { workspace = true } diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index 6807fb6bae..9111d2b0cd 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -41,7 +41,7 @@ pub mod quic; pub mod sender; pub mod server_error; pub mod shard; -pub mod shard_allocator; +pub use server_common::shard_allocator; pub mod state; pub mod streaming; pub mod tcp; diff --git a/core/server_common/Cargo.toml b/core/server_common/Cargo.toml index 9b658b9e0f..485f4f8bf7 100644 --- a/core/server_common/Cargo.toml +++ b/core/server_common/Cargo.toml @@ -38,14 +38,22 @@ lending-iterator = { workspace = true } moka = { workspace = true } rcgen = { workspace = true } rustls = { workspace = true } +serde = { workspace = true } smallvec = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } twox-hash = { workspace = true } +[target.'cfg(not(target_env = "musl"))'.dependencies] +hwlocality = { workspace = true } + +[target.'cfg(target_env = "musl")'.dependencies] +hwlocality = { workspace = true, features = ["vendored"] } + [target.'cfg(unix)'.dependencies] nix = { workspace = true } [dev-dependencies] +serde_json = { workspace = true } serial_test = { workspace = true } tokio = { workspace = true } diff --git a/core/server_common/src/lib.rs b/core/server_common/src/lib.rs index fcb955b5e4..abe25e806a 100644 --- a/core/server_common/src/lib.rs +++ b/core/server_common/src/lib.rs @@ -43,6 +43,7 @@ mod messages_batch_mut; mod messages_batch_set; mod segment_storage; pub mod send_messages2; +pub mod shard_allocator; pub mod sharding; pub use bootstrap::create_directories; diff --git a/core/server/src/shard_allocator.rs b/core/server_common/src/shard_allocator.rs similarity index 99% rename from core/server/src/shard_allocator.rs rename to core/server_common/src/shard_allocator.rs index 26a1f0e389..4d329f9bc4 100644 --- a/core/server/src/shard_allocator.rs +++ b/core/server_common/src/shard_allocator.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use configs::sharding::{CpuAllocation, NumaConfig}; +use crate::sharding::{CpuAllocation, NumaConfig}; use hwlocality::Topology; use hwlocality::bitmap::SpecializedBitmapRef; use hwlocality::cpu::cpuset::CpuSet; diff --git a/core/server_common/src/sharding/cpu_allocation.rs b/core/server_common/src/sharding/cpu_allocation.rs new file mode 100644 index 0000000000..fef2c3f467 --- /dev/null +++ b/core/server_common/src/sharding/cpu_allocation.rs @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::str::FromStr; + +#[derive(Debug, Clone, PartialEq, Default)] +pub enum CpuAllocation { + #[default] + All, + Count(usize), + Range(usize, usize), + NumaAware(NumaConfig), +} + +/// NUMA specific configuration +#[derive(Debug, Clone, PartialEq, Default)] +pub struct NumaConfig { + /// Which NUMA nodes to use (empty = auto-detect all) + pub nodes: Vec, + /// Cores per node to use (0 = use all available) + pub cores_per_node: usize, + /// skip hyperthread sibling + pub avoid_hyperthread: bool, +} + +impl CpuAllocation { + fn parse_numa(s: &str) -> Result { + let params = s + .strip_prefix("numa:") + .ok_or_else(|| "Numa config must start with 'numa:'".to_string())?; + + if params == "auto" { + return Ok(CpuAllocation::NumaAware(NumaConfig { + nodes: vec![], + cores_per_node: 0, + avoid_hyperthread: true, + })); + } + + let mut nodes = Vec::new(); + let mut cores_per_node = 0; + let mut avoid_hyperthread = true; + + for param in params.split(';') { + let kv: Vec<&str> = param.split('=').collect(); + if kv.len() != 2 { + return Err(format!( + "Invalid NUMA parameter: '{param}', only available: 'auto'" + )); + } + + match kv[0] { + "nodes" => { + nodes = kv[1] + .split(',') + .map(|n| { + n.parse::() + .map_err(|_| format!("Invalid node number: {n}")) + }) + .collect::, _>>()?; + } + "cores" => { + cores_per_node = kv[1] + .parse::() + .map_err(|_| format!("Invalid cores value: {}", kv[1]))?; + } + "no_ht" => { + avoid_hyperthread = kv[1] + .parse::() + .map_err(|_| format!("Invalid no ht value: {}", kv[1]))?; + } + _ => { + return Err(format!( + "Unknown NUMA parameter: {}, example: numa:nodes=0;cores=4;no_ht=true", + kv[0] + )); + } + } + } + + Ok(CpuAllocation::NumaAware(NumaConfig { + nodes, + cores_per_node, + avoid_hyperthread, + })) + } +} + +impl FromStr for CpuAllocation { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + "all" => Ok(CpuAllocation::All), + s if s.starts_with("numa:") => Self::parse_numa(s), + s if s.contains("..") => { + let parts: Vec<&str> = s.split("..").collect(); + if parts.len() != 2 { + return Err(format!("Invalid range format: {s}. Expected 'start..end'")); + } + let start = parts[0] + .parse::() + .map_err(|_| format!("Invalid start value: {}", parts[0]))?; + let end = parts[1] + .parse::() + .map_err(|_| format!("Invalid end value: {}", parts[1]))?; + Ok(CpuAllocation::Range(start, end)) + } + s => { + let count = s + .parse::() + .map_err(|_| format!("Invalid shard count: {s}"))?; + Ok(CpuAllocation::Count(count)) + } + } + } +} + +impl Serialize for CpuAllocation { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + CpuAllocation::All => serializer.serialize_str("all"), + CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64), + CpuAllocation::Range(start, end) => { + serializer.serialize_str(&format!("{start}..{end}")) + } + CpuAllocation::NumaAware(numa) => { + if numa.nodes.is_empty() && numa.cores_per_node == 0 { + serializer.serialize_str("numa:auto") + } else { + let nodes_str = numa + .nodes + .iter() + .map(|n| n.to_string()) + .collect::>() + .join(","); + + let full_str = format!( + "numa:nodes={};cores={};no_ht={}", + nodes_str, numa.cores_per_node, numa.avoid_hyperthread + ); + + serializer.serialize_str(&full_str) + } + } + } + } +} + +impl<'de> Deserialize<'de> for CpuAllocation { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + #[derive(Deserialize)] + #[serde(untagged)] + enum CpuAllocationHelper { + String(String), + Number(usize), + } + + match CpuAllocationHelper::deserialize(deserializer)? { + CpuAllocationHelper::String(s) => { + CpuAllocation::from_str(&s).map_err(serde::de::Error::custom) + } + CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_all() { + assert_eq!(CpuAllocation::from_str("all").unwrap(), CpuAllocation::All); + } + + #[test] + fn test_parse_count() { + assert_eq!( + CpuAllocation::from_str("4").unwrap(), + CpuAllocation::Count(4) + ); + } + + #[test] + fn test_parse_range() { + assert_eq!( + CpuAllocation::from_str("2..8").unwrap(), + CpuAllocation::Range(2, 8) + ); + } + + #[test] + fn test_parse_numa_auto() { + let result = CpuAllocation::from_str("numa:auto").unwrap(); + match result { + CpuAllocation::NumaAware(numa) => { + assert!(numa.nodes.is_empty()); + assert_eq!(numa.cores_per_node, 0); + assert!(numa.avoid_hyperthread); + } + _ => panic!("Expected NumaAware"), + } + } + + #[test] + fn test_parse_numa_explicit() { + let result = CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap(); + match result { + CpuAllocation::NumaAware(numa) => { + assert_eq!(numa.nodes, vec![0, 1]); + assert_eq!(numa.cores_per_node, 4); + assert!(numa.avoid_hyperthread); + } + _ => panic!("Expected NumaAware"), + } + } + + #[test] + fn test_numa_explicit_serde_roundtrip() { + let original = CpuAllocation::NumaAware(NumaConfig { + nodes: vec![0, 1], + cores_per_node: 4, + avoid_hyperthread: true, + }); + let serialized = serde_json::to_string(&original).unwrap(); + let deserialized: CpuAllocation = serde_json::from_str(&serialized).unwrap(); + assert_eq!(original, deserialized); + } +} diff --git a/core/server_common/src/sharding/mod.rs b/core/server_common/src/sharding/mod.rs index 761e41d045..5435a072c3 100644 --- a/core/server_common/src/sharding/mod.rs +++ b/core/server_common/src/sharding/mod.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. +mod cpu_allocation; mod local_idx; mod namespace; mod partition_location; mod shard_id; +pub use cpu_allocation::{CpuAllocation, NumaConfig}; pub use local_idx::LocalIdx; pub use namespace::{ IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, METADATA_CONSENSUS_NAMESPACE, From 3332a74a89857c92a80b70ab40b62ef1f6fc19db Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Sun, 28 Jun 2026 16:12:11 -0400 Subject: [PATCH 2/5] refactor(server): extract shard allocator into dedicated crate Moving the allocator into server_common pulled hwlocality into nearly the entire workspace, since almost every crate depends on server_common. That broke the aarch64-musl build: simulator links with -nodefaultlibs and the vendored hwloc drags in glibc's libm.a, failing on an undefined reference to errno. Keep CpuAllocation and NumaConfig in server_common::sharding, which configs re-exports and which carry no hwloc, and move the hwloc-backed ShardAllocator into a new shard_allocator crate that only server and server-ng depend on. This restores the confinement of hwloc to those two binaries that master had. Co-authored-by: Cursor --- Cargo.lock | 14 ++++++- Cargo.toml | 2 + core/configs/src/server_config/validators.rs | 2 +- core/server-ng/Cargo.toml | 1 + core/server-ng/src/bootstrap.rs | 2 +- core/server-ng/src/server_error.rs | 2 +- core/server/Cargo.toml | 1 + core/server/src/lib.rs | 2 +- core/server_common/Cargo.toml | 6 --- core/server_common/src/lib.rs | 1 - core/shard_allocator/Cargo.toml | 38 +++++++++++++++++++ .../src/lib.rs} | 2 +- 12 files changed, 60 insertions(+), 13 deletions(-) create mode 100644 core/shard_allocator/Cargo.toml rename core/{server_common/src/shard_allocator.rs => shard_allocator/src/lib.rs} (99%) diff --git a/Cargo.lock b/Cargo.lock index 554f29c12e..588166d820 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11543,6 +11543,7 @@ dependencies = [ "serde", "serde_json", "server_common", + "shard_allocator", "slab", "socket2 0.6.4", "strum 0.28.0", @@ -11624,6 +11625,7 @@ dependencies = [ "server", "server_common", "shard", + "shard_allocator", "slab", "socket2 0.6.4", "strum 0.28.0", @@ -11653,7 +11655,6 @@ dependencies = [ "crossbeam", "err_trail", "human-repr", - "hwlocality", "iggy_binary_protocol", "iggy_common", "lending-iterator", @@ -11748,6 +11749,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "shard_allocator" +version = "0.1.0" +dependencies = [ + "hwlocality", + "nix", + "server_common", + "thiserror 2.0.18", + "tracing", +] + [[package]] name = "sharded-slab" version = "0.1.7" diff --git a/Cargo.toml b/Cargo.toml index 4d196d9632..2e68f50015 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,6 +59,7 @@ members = [ "core/server-ng", "core/server_common", "core/shard", + "core/shard_allocator", "core/simulator", "core/tools", "examples/rust", @@ -278,6 +279,7 @@ server = { path = "core/server" } server-ng = { path = "core/server-ng" } server_common = { path = "core/server_common" } shard = { path = "core/shard" } +shard_allocator = { path = "core/shard_allocator" } simd-json = { version = "0.17.0", features = ["serde_impl"] } slab = "0.4.12" smallvec = "1.15" diff --git a/core/configs/src/server_config/validators.rs b/core/configs/src/server_config/validators.rs index 9b8337ee84..0aafe1ef5e 100644 --- a/core/configs/src/server_config/validators.rs +++ b/core/configs/src/server_config/validators.rs @@ -497,7 +497,7 @@ impl Validatable for ShardingConfig { Ok(()) } // NUMA topology validation requires hwlocality (runtime dep). - // Full NUMA validation happens in server_common::shard_allocator at startup. + // Full NUMA validation happens in shard_allocator at startup. CpuAllocation::NumaAware(_) => Ok(()), } } diff --git a/core/server-ng/Cargo.toml b/core/server-ng/Cargo.toml index bf42650ede..3d2a2ab5ae 100644 --- a/core/server-ng/Cargo.toml +++ b/core/server-ng/Cargo.toml @@ -149,6 +149,7 @@ serde = { workspace = true } server = { workspace = true } server_common = { workspace = true } shard = { workspace = true } +shard_allocator = { workspace = true } slab = { workspace = true } socket2 = { workspace = true } strum = { workspace = true } diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index fd97f37b02..42c8685b29 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -70,8 +70,8 @@ use partitions::{ use rustls::pki_types::ServerName; use server_common::bootstrap::create_directories; use server_common::executor::create_shard_executor; -use server_common::shard_allocator::{ShardAllocator, ShardInfo}; use server_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; +use shard_allocator::{ShardAllocator, ShardInfo}; // TODO: decouple bootstrap/storage helpers and logging from the `server` crate. use server::log::logger::Logging; use server::streaming::users::user::User as LegacyUser; diff --git a/core/server-ng/src/server_error.rs b/core/server-ng/src/server_error.rs index 6b78e74dcb..8d1e6f0eed 100644 --- a/core/server-ng/src/server_error.rs +++ b/core/server-ng/src/server_error.rs @@ -18,8 +18,8 @@ use metadata::impls::recovery::RecoveryError; // TODO: decouple logging errors from the `server` crate. use server::server_error::LogError; -use server_common::shard_allocator::ShardingError; use shard::ShardCtorError; +use shard_allocator::ShardingError; use thiserror::Error; #[derive(Debug, Error)] diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 927ca0a458..977e7ce730 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -92,6 +92,7 @@ send_wrapper = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } server_common = { workspace = true } +shard_allocator = { workspace = true } slab = { workspace = true } socket2 = { workspace = true } strum = { workspace = true } diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index 9111d2b0cd..a40e68bef5 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -41,7 +41,7 @@ pub mod quic; pub mod sender; pub mod server_error; pub mod shard; -pub use server_common::shard_allocator; +pub use shard_allocator; pub mod state; pub mod streaming; pub mod tcp; diff --git a/core/server_common/Cargo.toml b/core/server_common/Cargo.toml index 485f4f8bf7..083c25cf30 100644 --- a/core/server_common/Cargo.toml +++ b/core/server_common/Cargo.toml @@ -44,12 +44,6 @@ thiserror = { workspace = true } tracing = { workspace = true } twox-hash = { workspace = true } -[target.'cfg(not(target_env = "musl"))'.dependencies] -hwlocality = { workspace = true } - -[target.'cfg(target_env = "musl")'.dependencies] -hwlocality = { workspace = true, features = ["vendored"] } - [target.'cfg(unix)'.dependencies] nix = { workspace = true } diff --git a/core/server_common/src/lib.rs b/core/server_common/src/lib.rs index abe25e806a..fcb955b5e4 100644 --- a/core/server_common/src/lib.rs +++ b/core/server_common/src/lib.rs @@ -43,7 +43,6 @@ mod messages_batch_mut; mod messages_batch_set; mod segment_storage; pub mod send_messages2; -pub mod shard_allocator; pub mod sharding; pub use bootstrap::create_directories; diff --git a/core/shard_allocator/Cargo.toml b/core/shard_allocator/Cargo.toml new file mode 100644 index 0000000000..462279526e --- /dev/null +++ b/core/shard_allocator/Cargo.toml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "shard_allocator" +version = "0.1.0" +description = "CPU and NUMA shard allocation for the iggy server, backed by hwloc." +edition = "2024" +license = "Apache-2.0" +publish = false + +[dependencies] +server_common = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +[target.'cfg(not(target_env = "musl"))'.dependencies] +hwlocality = { workspace = true } + +[target.'cfg(target_env = "musl")'.dependencies] +hwlocality = { workspace = true, features = ["vendored"] } + +[target.'cfg(target_os = "linux")'.dependencies] +nix = { workspace = true } diff --git a/core/server_common/src/shard_allocator.rs b/core/shard_allocator/src/lib.rs similarity index 99% rename from core/server_common/src/shard_allocator.rs rename to core/shard_allocator/src/lib.rs index 4d329f9bc4..8d808ec27f 100644 --- a/core/server_common/src/shard_allocator.rs +++ b/core/shard_allocator/src/lib.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::sharding::{CpuAllocation, NumaConfig}; use hwlocality::Topology; use hwlocality::bitmap::SpecializedBitmapRef; use hwlocality::cpu::cpuset::CpuSet; @@ -23,6 +22,7 @@ use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy}; use hwlocality::object::types::ObjectType::{self, NUMANode}; #[cfg(target_os = "linux")] use nix::{sched::sched_setaffinity, unistd::Pid}; +use server_common::sharding::{CpuAllocation, NumaConfig}; use std::collections::HashSet; use std::sync::Arc; use std::thread::available_parallelism; From c5797871cb075b27cf1c0936c4580f462122bee4 Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Mon, 29 Jun 2026 22:02:23 -0400 Subject: [PATCH 3/5] refactor(server): host cpu allocation config in a leaf crate The shard allocator extraction parked CpuAllocation/NumaConfig in server_common, which forced a serde dependency onto a runtime crate and coupled the lightweight allocator to server_common's heavy tree. Move the two pure config types into a dedicated cpu_allocation leaf crate that depends only on serde, re-export them from configs to keep the configs::sharding path stable, and point shard_allocator at the leaf crate. Also drop the whole-crate re-export shim in server::lib in favor of direct paths, and document the new crate's public surface. Co-authored-by: Cursor --- Cargo.lock | 13 +- Cargo.toml | 2 + core/configs/Cargo.toml | 1 + core/configs/src/server_config/sharding.rs | 9 +- core/cpu_allocation/Cargo.toml | 30 ++++ .../src/lib.rs} | 131 +++++++++++++++++- core/server/src/bootstrap.rs | 2 +- core/server/src/lib.rs | 1 - core/server/src/main.rs | 2 +- core/server/src/server_error.rs | 2 +- core/server_common/Cargo.toml | 2 - core/server_common/src/sharding/mod.rs | 2 - core/shard_allocator/Cargo.toml | 2 +- core/shard_allocator/src/lib.rs | 35 ++++- 14 files changed, 213 insertions(+), 21 deletions(-) create mode 100644 core/cpu_allocation/Cargo.toml rename core/{server_common/src/sharding/cpu_allocation.rs => cpu_allocation/src/lib.rs} (67%) diff --git a/Cargo.lock b/Cargo.lock index 588166d820..864a9238f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3088,6 +3088,7 @@ name = "configs" version = "0.1.0" dependencies = [ "configs_derive", + "cpu_allocation", "derive_more", "err_trail", "figment", @@ -3281,6 +3282,14 @@ dependencies = [ "libm", ] +[[package]] +name = "cpu_allocation" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -11662,8 +11671,6 @@ dependencies = [ "nix", "rcgen", "rustls", - "serde", - "serde_json", "serial_test", "smallvec", "thiserror 2.0.18", @@ -11753,9 +11760,9 @@ dependencies = [ name = "shard_allocator" version = "0.1.0" dependencies = [ + "cpu_allocation", "hwlocality", "nix", - "server_common", "thiserror 2.0.18", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 2e68f50015..e4006e5ef3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ members = [ "core/connectors/sources/postgres_source", "core/connectors/sources/random_source", "core/consensus", + "core/cpu_allocation", "core/harness_derive", "core/integration", "core/journal", @@ -134,6 +135,7 @@ configs = { path = "core/configs", version = "0.1.0" } configs_derive = { path = "core/configs_derive", version = "0.1.0" } consensus = { path = "core/consensus" } console-subscriber = "0.5.0" +cpu_allocation = { path = "core/cpu_allocation" } crossbeam = "0.8.4" crossfire = "3.1.16" csv = "1.4.0" diff --git a/core/configs/Cargo.toml b/core/configs/Cargo.toml index c0f606b8ae..7d5d7b104d 100644 --- a/core/configs/Cargo.toml +++ b/core/configs/Cargo.toml @@ -24,6 +24,7 @@ publish = false [dependencies] configs_derive = { workspace = true } +cpu_allocation = { workspace = true } derive_more = { workspace = true } err_trail = { workspace = true } figment = { workspace = true } diff --git a/core/configs/src/server_config/sharding.rs b/core/configs/src/server_config/sharding.rs index cc1ea6c6a0..5dfe876dd7 100644 --- a/core/configs/src/server_config/sharding.rs +++ b/core/configs/src/server_config/sharding.rs @@ -22,10 +22,11 @@ use std::time::Duration; use configs::ConfigEnv; -// `CpuAllocation`/`NumaConfig` live in `server_common` next to the allocator -// that consumes them. Re-exported here to keep the `configs::sharding::*` path -// stable and to avoid a `server_common -> configs -> server_common` cycle. -pub use server_common::sharding::{CpuAllocation, NumaConfig}; +// `CpuAllocation`/`NumaConfig` are pure config types and live in their own +// leaf crate so both `configs` and `shard_allocator` can share them without +// pulling each other's heavier dependency trees. Re-exported here to keep the +// `configs::sharding::*` path stable for existing callers. +pub use cpu_allocation::{CpuAllocation, NumaConfig}; /// Default capacity of the per-shard inter-shard inbox channel. Sized /// comfortably above the consensus working set, which is roughly diff --git a/core/cpu_allocation/Cargo.toml b/core/cpu_allocation/Cargo.toml new file mode 100644 index 0000000000..74eea32309 --- /dev/null +++ b/core/cpu_allocation/Cargo.toml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "cpu_allocation" +version = "0.1.0" +description = "Shard CPU/NUMA allocation config types (CpuAllocation, NumaConfig) parsed from the iggy server config." +edition = "2024" +license = "Apache-2.0" +publish = false + +[dependencies] +serde = { workspace = true } + +[dev-dependencies] +serde_json = { workspace = true } diff --git a/core/server_common/src/sharding/cpu_allocation.rs b/core/cpu_allocation/src/lib.rs similarity index 67% rename from core/server_common/src/sharding/cpu_allocation.rs rename to core/cpu_allocation/src/lib.rs index fef2c3f467..5391e73b12 100644 --- a/core/server_common/src/sharding/cpu_allocation.rs +++ b/core/cpu_allocation/src/lib.rs @@ -15,9 +15,28 @@ // specific language governing permissions and limitations // under the License. +//! `cpu_allocation`: tiny config types that say how many CPU cores the +//! server should grab for its shards, and how. +//! +//! These two types ([`CpuAllocation`] and [`NumaConfig`]) are read from +//! the server config (TOML). The config crate re-exports them, and the +//! `shard_allocator` crate turns them into a real plan. Kept in their +//! own little crate so neither side has to pull in the other's heavy +//! dependencies just to share two small enums. + use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::str::FromStr; +/// Tell server how many CPU cores to grab for shards, and how. +/// +/// Server make one shard per core. This say which cores. Pick one: +/// - `All`: take every core machine have. +/// - `Count(n)`: take first `n` cores. +/// - `Range(a, b)`: take cores `a` up to (not including) `b`. +/// - `NumaAware(..)`: smart pick by NUMA node, keep memory close to core. +/// +/// Parse from a string in TOML, e.g. `"all"`, `4`, `"2..8"`, +/// `"numa:auto"`, or `"numa:nodes=0,1;cores=4;no_ht=true"`. #[derive(Debug, Clone, PartialEq, Default)] pub enum CpuAllocation { #[default] @@ -27,14 +46,18 @@ pub enum CpuAllocation { NumaAware(NumaConfig), } -/// NUMA specific configuration +/// Knobs for NUMA-aware core picking. +/// +/// NUMA = machine split into groups (nodes). Each node have own cores +/// and own memory. Memory of same node is fast; far node is slow. This +/// struct say which nodes to use and how many cores from each. #[derive(Debug, Clone, PartialEq, Default)] pub struct NumaConfig { - /// Which NUMA nodes to use (empty = auto-detect all) + /// Which NUMA nodes to use. Empty means: use all of them. pub nodes: Vec, - /// Cores per node to use (0 = use all available) + /// How many cores to take from each node. `0` means: take all. pub cores_per_node: usize, - /// skip hyperthread sibling + /// `true` means skip hyperthread twins, use only one thread per core. pub avoid_hyperthread: bool, } @@ -248,4 +271,104 @@ mod tests { let deserialized: CpuAllocation = serde_json::from_str(&serialized).unwrap(); assert_eq!(original, deserialized); } + + #[test] + fn test_parse_invalid_range_too_many_parts() { + assert!(CpuAllocation::from_str("1..2..3").is_err()); + } + + #[test] + fn test_parse_invalid_range_start() { + assert!(CpuAllocation::from_str("x..8").is_err()); + } + + #[test] + fn test_parse_invalid_range_end() { + assert!(CpuAllocation::from_str("2..y").is_err()); + } + + #[test] + fn test_parse_invalid_count() { + assert!(CpuAllocation::from_str("abc").is_err()); + } + + #[test] + fn test_parse_numa_missing_prefix() { + assert!(CpuAllocation::parse_numa("nodes=0").is_err()); + } + + #[test] + fn test_parse_numa_param_without_equals() { + assert!(CpuAllocation::from_str("numa:nodes").is_err()); + } + + #[test] + fn test_parse_numa_invalid_node_number() { + assert!(CpuAllocation::from_str("numa:nodes=a").is_err()); + } + + #[test] + fn test_parse_numa_invalid_cores() { + assert!(CpuAllocation::from_str("numa:cores=x").is_err()); + } + + #[test] + fn test_parse_numa_invalid_no_ht() { + assert!(CpuAllocation::from_str("numa:no_ht=maybe").is_err()); + } + + #[test] + fn test_parse_numa_unknown_param() { + assert!(CpuAllocation::from_str("numa:foo=1").is_err()); + } + + #[test] + fn test_serialize_all() { + assert_eq!( + serde_json::to_string(&CpuAllocation::All).unwrap(), + "\"all\"" + ); + } + + #[test] + fn test_serialize_count() { + assert_eq!( + serde_json::to_string(&CpuAllocation::Count(4)).unwrap(), + "4" + ); + } + + #[test] + fn test_serialize_range() { + assert_eq!( + serde_json::to_string(&CpuAllocation::Range(2, 8)).unwrap(), + "\"2..8\"" + ); + } + + #[test] + fn test_serialize_numa_auto() { + let auto = CpuAllocation::NumaAware(NumaConfig { + nodes: vec![], + cores_per_node: 0, + avoid_hyperthread: true, + }); + assert_eq!(serde_json::to_string(&auto).unwrap(), "\"numa:auto\""); + } + + #[test] + fn test_deserialize_number() { + assert_eq!( + serde_json::from_str::("4").unwrap(), + CpuAllocation::Count(4) + ); + } + + #[test] + fn test_deserialize_string() { + assert_eq!( + serde_json::from_str::("\"all\"").unwrap(), + CpuAllocation::All + ); + } } diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs index f35a43a2cd..eb42c0539e 100644 --- a/core/server/src/bootstrap.rs +++ b/core/server/src/bootstrap.rs @@ -33,7 +33,6 @@ use crate::{ frame::ShardFrame, }, }, - shard_allocator::ShardInfo, state::system::{StreamState, TopicState, UserState}, streaming::{ partitions::{ @@ -57,6 +56,7 @@ use iggy_common::{ MIN_USERNAME_LENGTH, }, }; +use shard_allocator::ShardInfo; use slab::Slab; use std::{env, sync::Arc}; use tracing::{info, warn}; diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index a40e68bef5..70a3e6c4dc 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -41,7 +41,6 @@ pub mod quic; pub mod sender; pub mod server_error; pub mod shard; -pub use shard_allocator; pub mod state; pub mod streaming; pub mod tcp; diff --git a/core/server/src/main.rs b/core/server/src/main.rs index fc3dd4a4b4..928912a640 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -39,7 +39,6 @@ use server::metadata::{Metadata, create_metadata_handles}; use server::server_error::ServerError; use server::shard::system::info::SystemInfo; use server::shard::{IggyShard, calculate_shard_assignment}; -use server::shard_allocator::ShardAllocator; use server::state::file::FileState; use server::state::system::SystemState; use server::streaming::clients::client_manager::{Client, ClientManager}; @@ -48,6 +47,7 @@ use server::streaming::storage::SystemStorage; use server::streaming::utils::ptr::EternalPtr; use server_common::MemoryPool; use server_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; +use shard_allocator::ShardAllocator; use std::panic::AssertUnwindSafe; use std::rc::Rc; use std::str::FromStr; diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs index 6a63fac8fa..5dde456e42 100644 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@ -36,7 +36,7 @@ error_set!( NumaError := { #[display("{0}")] - Sharding(crate::shard_allocator::ShardingError), + Sharding(shard_allocator::ShardingError), } ConfigurationError := { diff --git a/core/server_common/Cargo.toml b/core/server_common/Cargo.toml index 083c25cf30..9b658b9e0f 100644 --- a/core/server_common/Cargo.toml +++ b/core/server_common/Cargo.toml @@ -38,7 +38,6 @@ lending-iterator = { workspace = true } moka = { workspace = true } rcgen = { workspace = true } rustls = { workspace = true } -serde = { workspace = true } smallvec = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } @@ -48,6 +47,5 @@ twox-hash = { workspace = true } nix = { workspace = true } [dev-dependencies] -serde_json = { workspace = true } serial_test = { workspace = true } tokio = { workspace = true } diff --git a/core/server_common/src/sharding/mod.rs b/core/server_common/src/sharding/mod.rs index 5435a072c3..761e41d045 100644 --- a/core/server_common/src/sharding/mod.rs +++ b/core/server_common/src/sharding/mod.rs @@ -15,13 +15,11 @@ // specific language governing permissions and limitations // under the License. -mod cpu_allocation; mod local_idx; mod namespace; mod partition_location; mod shard_id; -pub use cpu_allocation::{CpuAllocation, NumaConfig}; pub use local_idx::LocalIdx; pub use namespace::{ IggyNamespace, MAX_PARTITIONS, MAX_STREAMS, MAX_TOPICS, METADATA_CONSENSUS_NAMESPACE, diff --git a/core/shard_allocator/Cargo.toml b/core/shard_allocator/Cargo.toml index 462279526e..d44574376d 100644 --- a/core/shard_allocator/Cargo.toml +++ b/core/shard_allocator/Cargo.toml @@ -24,7 +24,7 @@ license = "Apache-2.0" publish = false [dependencies] -server_common = { workspace = true } +cpu_allocation = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } diff --git a/core/shard_allocator/src/lib.rs b/core/shard_allocator/src/lib.rs index 8d808ec27f..b02343a537 100644 --- a/core/shard_allocator/src/lib.rs +++ b/core/shard_allocator/src/lib.rs @@ -15,6 +15,16 @@ // specific language governing permissions and limitations // under the License. +//! `shard_allocator`: decide which CPU cores each shard lives on. +//! +//! The server makes many shards and wants each one to run on its own +//! core so they do not fight over CPU time. This crate reads the +//! operator's choice ([`CpuAllocation`] from the config), looks at the +//! real machine with `hwloc`, and hands back one [`ShardInfo`] per +//! shard. On Linux it also pins each shard's thread to its core and +//! pins memory to the right NUMA node, so memory stays close and fast. + +use cpu_allocation::{CpuAllocation, NumaConfig}; use hwlocality::Topology; use hwlocality::bitmap::SpecializedBitmapRef; use hwlocality::cpu::cpuset::CpuSet; @@ -22,12 +32,14 @@ use hwlocality::memory::binding::{MemoryBindingFlags, MemoryBindingPolicy}; use hwlocality::object::types::ObjectType::{self, NUMANode}; #[cfg(target_os = "linux")] use nix::{sched::sched_setaffinity, unistd::Pid}; -use server_common::sharding::{CpuAllocation, NumaConfig}; use std::collections::HashSet; use std::sync::Arc; use std::thread::available_parallelism; use tracing::info; +/// All the ways shard allocation can go wrong: machine has no NUMA, +/// hwloc cannot read the topology, the operator asked for more cores +/// than exist, or the OS refused to pin a thread or its memory. #[derive(Debug, thiserror::Error)] pub enum ShardingError { #[error("Failed to detect topology: {msg}")] @@ -56,6 +68,10 @@ pub enum ShardingError { Other { msg: String }, } +/// A snapshot of the machine's NUMA layout, read once from `hwloc`. +/// +/// Holds how many NUMA nodes there are and, for each node, how many +/// real (physical) cores and how many threads (logical cores) it has. #[derive(Debug)] pub struct NumaTopology { topology: Topology, @@ -65,6 +81,8 @@ pub struct NumaTopology { } impl NumaTopology { + /// Ask `hwloc` to read this machine's NUMA layout right now. + /// Errors if hwloc fails or the machine reports no NUMA nodes. pub fn detect() -> Result { let topology = Topology::new().map_err(|e| ShardingError::TopologyDetection { msg: e.to_string() })?; @@ -110,10 +128,13 @@ impl NumaTopology { }) } + /// How many real cores this node has. Returns `0` if no such node. pub fn physical_cores_for_node(&self, node: usize) -> usize { self.physical_cores_per_node.get(node).copied().unwrap_or(0) } + /// How many threads (logical cores) this node has, hyperthreads + /// included. Returns `0` if no such node. pub fn logical_cores_for_node(&self, node: usize) -> usize { self.logical_cores_per_node.get(node).copied().unwrap_or(0) } @@ -161,6 +182,8 @@ impl NumaTopology { } } +/// One shard's home: which CPU cores it may run on, and which NUMA +/// node its memory should sit near (`None` means do not pin memory). #[derive(Debug, Clone)] pub struct ShardInfo { pub cpu_set: HashSet, @@ -168,6 +191,8 @@ pub struct ShardInfo { } impl ShardInfo { + /// Pin the calling thread to this shard's cores. On non-Linux this + /// does nothing (no-op). Empty core set also does nothing. pub fn bind_cpu(&self) -> Result<(), ShardingError> { #[cfg(target_os = "linux")] { @@ -196,6 +221,8 @@ impl ShardInfo { Ok(()) } + /// Pin the calling thread's memory to this shard's NUMA node so + /// allocations stay local and fast. Does nothing if no node is set. pub fn bind_memory(&self) -> Result<(), ShardingError> { if let Some(node_id) = self.numa_node { let topology = Topology::new().map_err(|err| ShardingError::TopologyDetection { @@ -230,12 +257,16 @@ impl ShardInfo { } } +/// Turns the operator's [`CpuAllocation`] choice into a concrete plan +/// of shards. Reads the NUMA topology only when the choice needs it. pub struct ShardAllocator { allocation: CpuAllocation, topology: Option>, } impl ShardAllocator { + /// Build an allocator for the given choice. Only `NumaAware` reads + /// the machine topology up front; the simpler modes do not. pub fn new(allocation: &CpuAllocation) -> Result { let topology = if matches!(allocation, CpuAllocation::NumaAware(_)) { let numa_topology = NumaTopology::detect()?; @@ -251,6 +282,8 @@ impl ShardAllocator { }) } + /// Produce the final list of shards, one [`ShardInfo`] each, based + /// on the chosen [`CpuAllocation`]. This is the main entry point. pub fn to_shard_assignments(&self) -> Result, ShardingError> { match &self.allocation { CpuAllocation::All => { From 8d5e471730d92f66faa2f650b9fefaea168ef61d Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Mon, 29 Jun 2026 22:47:39 -0400 Subject: [PATCH 4/5] chore: re-trigger CI Co-authored-by: Cursor From 373778e20444313fc79e0d375104521152d71f8f Mon Sep 17 00:00:00 2001 From: Jaya Kasa Date: Mon, 29 Jun 2026 23:01:17 -0400 Subject: [PATCH 5/5] fix(shard_allocator): add empty libm stub for musl static link Vendored hwloc references cbrt, forcing the linker to resolve -lm. musl folds math into libc and rust's musl sysroot ships no libm.a, so -lm fell through to the host glibc libm.a, whose cbrt needs glibc-only __frexp/__ldexp symbols absent on musl, breaking the aarch64-musl build. Drop an empty libm.a on the search path (musl-gated build script) so -lm resolves to nothing and cbrt is satisfied later by musl's own libc. This fixes the root cause independent of crate link ordering. Co-authored-by: Cursor --- core/shard_allocator/build.rs | 43 +++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 core/shard_allocator/build.rs diff --git a/core/shard_allocator/build.rs b/core/shard_allocator/build.rs new file mode 100644 index 0000000000..d66e93f072 --- /dev/null +++ b/core/shard_allocator/build.rs @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Vendored `hwloc` references `cbrt`, which makes the linker pull in a +// `libm`. musl folds the math functions into its `libc`, so the Rust +// musl sysroot ships no `libm.a`. Without one, `-lm` falls through to +// the host glibc's `libm.a`, whose `cbrt` needs glibc-internal +// `__frexp`/`__ldexp` symbols that do not exist on musl, and the static +// link fails. Drop an empty `libm.a` stub on the search path so `-lm` +// resolves to nothing and `cbrt` is satisfied later by musl's own +// `libc`. No effect on non-musl targets. + +use std::env; +use std::fs; +use std::path::Path; + +fn main() { + if env::var("CARGO_CFG_TARGET_ENV").as_deref() != Ok("musl") { + return; + } + + let out_dir = env::var("OUT_DIR").expect("OUT_DIR is set by cargo for build scripts"); + let stub = Path::new(&out_dir).join("libm.a"); + + // `!\n` is the canonical header of an empty `ar` archive. + fs::write(&stub, b"!\n").expect("write empty libm.a stub"); + + println!("cargo:rustc-link-search=native={out_dir}"); +}