Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ members = [
"core/connectors/sources/postgres_source",
"core/connectors/sources/random_source",
"core/consensus",
"core/cpu_allocation",
"core/harness_derive",
"core/integration",
"core/journal",
Expand All @@ -60,6 +61,7 @@ members = [
"core/server-ng",
"core/server_common",
"core/shard",
"core/shard_allocator",
"core/simulator",
"core/tools",
"examples/rust",
Expand Down Expand Up @@ -134,6 +136,7 @@ configs = { path = "core/configs", version = "0.1.0" }
configs_derive = { path = "core/configs_derive", version = "0.1.0" }
consensus = { path = "core/consensus" }
console-subscriber = "0.5.0"
cpu_allocation = { path = "core/cpu_allocation" }
crossbeam = "0.8.4"
crossfire = "3.1.16"
csv = "1.4.0"
Expand Down Expand Up @@ -279,6 +282,7 @@ server = { path = "core/server" }
server-ng = { path = "core/server-ng" }
server_common = { path = "core/server_common" }
shard = { path = "core/shard" }
shard_allocator = { path = "core/shard_allocator" }
simd-json = { version = "0.17.0", features = ["serde_impl"] }
slab = "0.4.12"
smallvec = "1.15"
Expand Down
1 change: 1 addition & 0 deletions core/configs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ publish = false

[dependencies]
configs_derive = { workspace = true }
cpu_allocation = { workspace = true }
derive_more = { workspace = true }
err_trail = { workspace = true }
figment = { workspace = true }
Expand Down
241 changes: 7 additions & 234 deletions core/configs/src/server_config/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@
// under the License.

use iggy_common::IggyDuration;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde::{Deserialize, Serialize};
use serde_with::{DisplayFromStr, serde_as};
use std::str::FromStr;
use std::time::Duration;

use configs::ConfigEnv;

// `CpuAllocation`/`NumaConfig` are pure config types and live in their own
// leaf crate so both `configs` and `shard_allocator` can share them without
// pulling each other's heavier dependency trees. Re-exported here to keep the
// `configs::sharding::*` path stable for existing callers.
pub use cpu_allocation::{CpuAllocation, NumaConfig};

/// Default capacity of the per-shard inter-shard inbox channel. Sized
/// comfortably above the consensus working set, which is roughly
/// `PIPELINE_PREPARE_QUEUE_MAX (= 8) * replica_count * directions`
Expand Down Expand Up @@ -178,235 +183,3 @@ impl Default for ShardingConfig {
}
}
}

#[derive(Debug, Clone, PartialEq, Default)]
pub enum CpuAllocation {
#[default]
All,
Count(usize),
Range(usize, usize),
NumaAware(NumaConfig),
}

/// NUMA specific configuration
#[derive(Debug, Clone, PartialEq, Default)]
pub struct NumaConfig {
/// Which NUMA nodes to use (empty = auto-detect all)
pub nodes: Vec<usize>,
/// Cores per node to use (0 = use all available)
pub cores_per_node: usize,
/// skip hyperthread sibling
pub avoid_hyperthread: bool,
}

impl CpuAllocation {
fn parse_numa(s: &str) -> Result<CpuAllocation, String> {
let params = s
.strip_prefix("numa:")
.ok_or_else(|| "Numa config must start with 'numa:'".to_string())?;

if params == "auto" {
return Ok(CpuAllocation::NumaAware(NumaConfig {
nodes: vec![],
cores_per_node: 0,
avoid_hyperthread: true,
}));
}

let mut nodes = Vec::new();
let mut cores_per_node = 0;
let mut avoid_hyperthread = true;

for param in params.split(';') {
let kv: Vec<&str> = param.split('=').collect();
if kv.len() != 2 {
return Err(format!(
"Invalid NUMA parameter: '{param}', only available: 'auto'"
));
}

match kv[0] {
"nodes" => {
nodes = kv[1]
.split(',')
.map(|n| {
n.parse::<usize>()
.map_err(|_| format!("Invalid node number: {n}"))
})
.collect::<Result<Vec<_>, _>>()?;
}
"cores" => {
cores_per_node = kv[1]
.parse::<usize>()
.map_err(|_| format!("Invalid cores value: {}", kv[1]))?;
}
"no_ht" => {
avoid_hyperthread = kv[1]
.parse::<bool>()
.map_err(|_| format!("Invalid no ht value: {}", kv[1]))?;
}
_ => {
return Err(format!(
"Unknown NUMA parameter: {}, example: numa:nodes=0;cores=4;no_ht=true",
kv[0]
));
}
}
}

Ok(CpuAllocation::NumaAware(NumaConfig {
nodes,
cores_per_node,
avoid_hyperthread,
}))
}
}

impl FromStr for CpuAllocation {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"all" => Ok(CpuAllocation::All),
s if s.starts_with("numa:") => Self::parse_numa(s),
s if s.contains("..") => {
let parts: Vec<&str> = s.split("..").collect();
if parts.len() != 2 {
return Err(format!("Invalid range format: {s}. Expected 'start..end'"));
}
let start = parts[0]
.parse::<usize>()
.map_err(|_| format!("Invalid start value: {}", parts[0]))?;
let end = parts[1]
.parse::<usize>()
.map_err(|_| format!("Invalid end value: {}", parts[1]))?;
Ok(CpuAllocation::Range(start, end))
}
s => {
let count = s
.parse::<usize>()
.map_err(|_| format!("Invalid shard count: {s}"))?;
Ok(CpuAllocation::Count(count))
}
}
}
}

impl Serialize for CpuAllocation {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
CpuAllocation::All => serializer.serialize_str("all"),
CpuAllocation::Count(n) => serializer.serialize_u64(*n as u64),
CpuAllocation::Range(start, end) => {
serializer.serialize_str(&format!("{start}..{end}"))
}
CpuAllocation::NumaAware(numa) => {
if numa.nodes.is_empty() && numa.cores_per_node == 0 {
serializer.serialize_str("numa:auto")
} else {
let nodes_str = numa
.nodes
.iter()
.map(|n| n.to_string())
.collect::<Vec<_>>()
.join(",");

let full_str = format!(
"numa:nodes={};cores={};no_ht={}",
nodes_str, numa.cores_per_node, numa.avoid_hyperthread
);

serializer.serialize_str(&full_str)
}
}
}
}
}

impl<'de> Deserialize<'de> for CpuAllocation {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum CpuAllocationHelper {
String(String),
Number(usize),
}

match CpuAllocationHelper::deserialize(deserializer)? {
CpuAllocationHelper::String(s) => {
CpuAllocation::from_str(&s).map_err(serde::de::Error::custom)
}
CpuAllocationHelper::Number(n) => Ok(CpuAllocation::Count(n)),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_all() {
assert_eq!(CpuAllocation::from_str("all").unwrap(), CpuAllocation::All);
}

#[test]
fn test_parse_count() {
assert_eq!(
CpuAllocation::from_str("4").unwrap(),
CpuAllocation::Count(4)
);
}

#[test]
fn test_parse_range() {
assert_eq!(
CpuAllocation::from_str("2..8").unwrap(),
CpuAllocation::Range(2, 8)
);
}

#[test]
fn test_parse_numa_auto() {
let result = CpuAllocation::from_str("numa:auto").unwrap();
match result {
CpuAllocation::NumaAware(numa) => {
assert!(numa.nodes.is_empty());
assert_eq!(numa.cores_per_node, 0);
assert!(numa.avoid_hyperthread);
}
_ => panic!("Expected NumaAware"),
}
}

#[test]
fn test_parse_numa_explicit() {
let result = CpuAllocation::from_str("numa:nodes=0,1;cores=4;no_ht=true").unwrap();
match result {
CpuAllocation::NumaAware(numa) => {
assert_eq!(numa.nodes, vec![0, 1]);
assert_eq!(numa.cores_per_node, 4);
assert!(numa.avoid_hyperthread);
}
_ => panic!("Expected NumaAware"),
}
}

#[test]
fn test_numa_explicit_serde_roundtrip() {
let original = CpuAllocation::NumaAware(NumaConfig {
nodes: vec![0, 1],
cores_per_node: 4,
avoid_hyperthread: true,
});
let serialized = serde_json::to_string(&original).unwrap();
let deserialized: CpuAllocation = serde_json::from_str(&serialized).unwrap();
assert_eq!(original, deserialized);
}
}
2 changes: 1 addition & 1 deletion core/configs/src/server_config/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl Validatable<ConfigurationError> for ShardingConfig {
Ok(())
}
// NUMA topology validation requires hwlocality (runtime dep).
// Full NUMA validation happens in server::shard_allocator at startup.
// Full NUMA validation happens in shard_allocator at startup.
CpuAllocation::NumaAware(_) => Ok(()),
}
}
Expand Down
Loading
Loading