chore: introduce property testing in the test suite

This commit is contained in:
JasterV 2025-12-01 01:40:43 +01:00
parent 1207ed90af
commit 9a466e976a
3 changed files with 302 additions and 143 deletions

263
Cargo.lock generated
View file

@ -36,6 +36,27 @@ dependencies = [
"syn",
]
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "bit-set"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08807e080ed7f9d5433fa9b275196cfc35414f66a0c79d864dc51a0d825231a3"
dependencies = [
"bit-vec",
]
[[package]]
name = "bit-vec"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7"
[[package]]
name = "bitflags"
version = "2.10.0"
@ -57,6 +78,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "convert_case"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
@ -77,6 +107,40 @@ dependencies = [
"parking_lot_core",
]
[[package]]
name = "derive-ex"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bba95f299f6b9cd47f68a847eca2ae9060a2713af532dc35c342065544845407"
dependencies = [
"proc-macro2",
"quote",
"structmeta",
"syn",
]
[[package]]
name = "deunicode"
version = "1.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abd57806937c9cc163efc8ea3910e00a62e2aeb0b8119f1793a978088f8f6b04"
[[package]]
name = "either"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "errno"
version = "0.3.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
dependencies = [
"libc",
"windows-sys",
]
[[package]]
name = "event-listener"
version = "5.4.1"
@ -105,12 +169,37 @@ dependencies = [
"async-broadcast",
"async-stream",
"dashmap",
"fake",
"futures",
"rand",
"proptest",
"test-strategy",
"thiserror",
"tokio",
]
[[package]]
name = "fake"
version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2b0902eb36fbab51c14eda1c186bda119fcff91e5e4e7fc2dd2077298197ce8"
dependencies = [
"deunicode",
"either",
"rand",
]
[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "futures"
version = "0.3.31"
@ -224,6 +313,12 @@ version = "0.2.177"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976"
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039"
[[package]]
name = "lock_api"
version = "0.4.14"
@ -239,6 +334,15 @@ version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@ -294,6 +398,44 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "proptest"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bee689443a2bd0a16ab0348b52ee43e3b2d1b1f931c8aa5c9f8de4c86fbe8c40"
dependencies = [
"bit-set",
"bit-vec",
"bitflags",
"num-traits",
"proptest-macro",
"rand",
"rand_chacha",
"rand_xorshift",
"regex-syntax",
"rusty-fork",
"tempfile",
"unarray",
]
[[package]]
name = "proptest-macro"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "110442e79d698ac2d5913a6b14e99474650e1a77f4af30da67aa77d1c36c2a90"
dependencies = [
"convert_case",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.42"
@ -338,6 +480,15 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rand_xorshift"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "513962919efc330f829edb2535844d1b912b0fbe2ca165d613e4e8788bb05a5a"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
@ -347,6 +498,37 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex-syntax"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "rustix"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e"
dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys",
"windows-sys",
]
[[package]]
name = "rusty-fork"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc6bf79ff24e648f6da1f8d1f011e9cac26491b619e6b9280f2b47f1774e6ee2"
dependencies = [
"fnv",
"quick-error",
"tempfile",
"wait-timeout",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
@ -365,6 +547,29 @@ version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "structmeta"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329"
dependencies = [
"proc-macro2",
"quote",
"structmeta-derive",
"syn",
]
[[package]]
name = "structmeta-derive"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "syn"
version = "2.0.111"
@ -376,6 +581,32 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "tempfile"
version = "3.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16"
dependencies = [
"fastrand",
"getrandom",
"once_cell",
"rustix",
"windows-sys",
]
[[package]]
name = "test-strategy"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b12f9683de37f9980e485167ee624bfaa0b6b04da661e98e25ef9c2669bc1b"
dependencies = [
"derive-ex",
"proc-macro2",
"quote",
"structmeta",
"syn",
]
[[package]]
name = "thiserror"
version = "2.0.17"
@ -417,12 +648,33 @@ dependencies = [
"syn",
]
[[package]]
name = "unarray"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94"
[[package]]
name = "unicode-ident"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
[[package]]
name = "unicode-segmentation"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
[[package]]
name = "wait-timeout"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ac3b126d3914f9849036f826e054cbabdc8519970b8998ddaf3b5bd3c65f11"
dependencies = [
"libc",
]
[[package]]
name = "wasip2"
version = "1.0.1+wasi-0.2.4"
@ -438,6 +690,15 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-sys"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc"
dependencies = [
"windows-link",
]
[[package]]
name = "wit-bindgen"
version = "0.46.0"

View file

@ -12,12 +12,14 @@ repository = "https://github.com/JasterV/event_bus.rs"
license = "MIT"
[dependencies]
dashmap = "6.1.0"
thiserror = "2.0.17"
async-broadcast = "0.7"
futures = "0.3"
async-stream = "0.3"
rand = "0.9.2"
dashmap = "6.1.0"
futures = "0.3"
thiserror = "2.0.17"
[dev-dependencies]
fake = "4.4.0"
proptest = { version = "1.9.0", features = ["attr-macro"] }
test-strategy = "0.4.3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

View file

@ -110,8 +110,11 @@ impl EventBus {
#[cfg(test)]
mod tests {
use super::*;
use fake::{Fake, Faker};
use futures::StreamExt;
use rand::{RngCore, SeedableRng, rngs::StdRng};
use futures::stream;
use proptest::prop_assert_eq;
use test_strategy::proptest;
#[tokio::test(flavor = "multi_thread")]
async fn test_multithreaded_pub_sub() {
@ -205,6 +208,7 @@ mod tests {
let mut sub_b = bus.subscribe("B");
bus.publish("A", b"msgA").unwrap();
bus.publish("A", b"msgC").unwrap();
bus.publish("B", b"msgB").unwrap();
bus.publish("B", b"msgD").unwrap();
@ -220,153 +224,45 @@ mod tests {
assert_eq!(&*recv_d, b"msgD");
}
#[tokio::test(flavor = "multi_thread")]
async fn stress_test_concurrent_publishers() {
const TOPIC: &str = "stress_topic";
const PUBLISHERS: usize = 20;
const MSGS_PER_PUBLISHER: usize = 200;
const TOTAL_MSGS: usize = PUBLISHERS * MSGS_PER_PUBLISHER;
let bus = EventBus::new();
let mut sub = bus.subscribe(TOPIC);
// Deterministic RNG for reproducibility
let mut rng = StdRng::seed_from_u64(12345);
// Pre-generate all messages and expected results
let messages: Vec<Arc<[u8]>> = (0..TOTAL_MSGS)
.map(|_| rng.next_u64().to_le_bytes().into())
.collect();
// Spawn publisher tasks
let handles: Vec<_> = (0..PUBLISHERS)
.map(|id| {
let start = id * MSGS_PER_PUBLISHER;
let end = start + MSGS_PER_PUBLISHER;
let bus = bus.clone();
let slice = messages[start..end].to_vec();
tokio::spawn(async move {
for msg in slice {
bus.publish(TOPIC, &msg).unwrap();
}
})
})
.collect();
// Collect all messages in receiver
let mut received = Vec::new();
for _ in 0..TOTAL_MSGS {
let msg = sub
.next()
.await
.expect("Channel closed unexpectedly during stress test");
received.push(msg.to_vec());
}
// Ensure all send tasks complete
for h in handles {
h.await.unwrap();
}
// Check we got all messages, sorted by content because async order varies
let mut expected_sorted: Vec<_> = messages.clone().iter().map(|v| v.to_vec()).collect();
expected_sorted.sort();
received.sort();
assert_eq!(
received, expected_sorted,
"Message mismatch under stress load!"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn stress_test_multiple_subscribers() {
#[proptest(async = "tokio")]
async fn stress_test_multiple_subscribers_prop(
#[strategy(1usize..100)] subscribers: usize,
#[strategy(20usize..1_000)] total_msgs: usize,
) {
const TOPIC: &str = "stress_multi_subs";
const PUBLISHERS: usize = 10;
const SUBSCRIBERS: usize = 15;
const MSGS_PER_PUBLISHER: usize = 150;
const TOTAL: usize = PUBLISHERS * MSGS_PER_PUBLISHER;
let bus = EventBus::new_with_topic_capacity(total_msgs);
let bus = EventBus::new();
// Deterministic RNG seed
let mut rng = StdRng::seed_from_u64(9999);
// Random messages corpus
let messages: Vec<Arc<[u8]>> = (0..TOTAL)
.map(|_| rng.next_u64().to_le_bytes().into())
// Generate messages to publish
let mut messages: Vec<Arc<[u8]>> = (0..total_msgs)
.map(|_| Faker.fake::<u64>().to_le_bytes().into())
.collect();
// Create subscribers
let subs: Vec<Subscription> = (0..SUBSCRIBERS).map(|_| bus.subscribe(TOPIC)).collect();
// Register all subscribers
let subs: Vec<_> = (0..subscribers).map(|_| bus.subscribe(TOPIC)).collect();
// Spawn publishers
let pub_handles = (0..PUBLISHERS)
.map(|id| {
let start = id * MSGS_PER_PUBLISHER;
let end = start + MSGS_PER_PUBLISHER;
let bus = bus.clone();
let slice = messages[start..end].to_vec();
// Run all subscribers concurrently
let subs_handles = tokio::spawn(async move {
stream::iter(subs)
.map(|sub| sub.take(total_msgs).collect::<Vec<_>>())
.buffer_unordered(subscribers)
.collect::<Vec<_>>()
.await
});
tokio::spawn(async move {
for msg in slice {
bus.publish(TOPIC, &msg).unwrap();
// Bring the task randomly back to the runtime
if rand::random::<bool>() {
tokio::task::yield_now().await;
}
}
})
})
.collect::<Vec<_>>();
// Spawn subscriber collectors
let sub_handles = subs
.into_iter()
.map(|mut sub| {
tokio::spawn(async move {
let mut collected = Vec::with_capacity(TOTAL);
for _ in 0..TOTAL {
let msg = sub
.next()
.await
.expect("Channel closed unexpectedly during stress test");
collected.push(msg.to_vec());
}
collected
})
})
.collect::<Vec<_>>();
// Ensure publishers finish
for h in pub_handles {
h.await.unwrap();
// Publish all messages
for msg in &messages {
bus.publish(TOPIC, msg.as_ref()).unwrap();
}
// Collect all subscriber results
let mut sub_results = Vec::new();
for h in sub_handles {
sub_results.push(h.await.unwrap());
}
// Wait for all subscribers to complete
let sub_results = subs_handles.await.unwrap();
// Validation (for each subscriber)
for mut received in sub_results {
// Asynchronous race means ordering is not enforced → sort
received.sort();
// Sort input messages to be able to assert against the results
messages.sort();
let mut expected: Vec<_> = messages.clone().into_iter().map(|v| v.to_vec()).collect();
expected.sort();
assert_eq!(
received, expected,
"Subscriber missed or corrupted messages",
);
for mut result in sub_results {
result.sort();
prop_assert_eq!(&result, &messages, "subscriber missed or altered messages");
}
}
}