diff --git a/Cargo.lock b/Cargo.lock index a0a256d..fd66e09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,6 +106,7 @@ dependencies = [ "async-stream", "dashmap", "futures", + "rand", "thiserror", "tokio", ] @@ -199,6 +200,18 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -263,6 +276,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + [[package]] name = "proc-macro2" version = "1.0.103" @@ -281,6 +303,41 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.5.18" @@ -366,8 +423,43 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +[[package]] +name = "wasip2" +version = "1.0.1+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "wit-bindgen" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" + +[[package]] +name = "zerocopy" +version = "0.8.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ea879c944afe8a2b25fef16bb4ba234f47c694565e97383b36f3a878219065c" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf955aa904d6040f70dc8e9384444cb1030aed272ba3cb09bbc4ab9e7c1f34f5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml index 70e1df5..915e6bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ thiserror = "2.0.17" async-broadcast = "0.7" futures = "0.3" async-stream = "0.3" +rand = "0.9.2" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/src/lib.rs b/src/lib.rs index 1a067a0..bb4a54e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,31 +111,262 @@ impl EventBus { mod tests { use super::*; use futures::StreamExt; - - // Helper function to extract a message from the subscription - async fn get_next_message(sub: &mut Subscription) -> String { - let payload = sub.next().await.expect("Stream unexpectedly closed"); - String::from_utf8_lossy(&payload).to_string() - } + use rand::{RngCore, SeedableRng, rngs::StdRng}; #[tokio::test(flavor = "multi_thread")] - async fn test_simple_pub_sub() { + async fn test_multithreaded_pub_sub() { let event_bus = EventBus::new(); let topic = "test_simple"; - let expected_message = "Hello EventBus"; + let expected_message = b"Hello EventBus"; let mut subscription = event_bus.subscribe(topic); - let task_handle = tokio::spawn(async move { get_next_message(&mut subscription).await }); + let task_handle = tokio::spawn(async move { subscription.next().await.unwrap() }); - event_bus - .publish(topic, expected_message.as_bytes()) - .unwrap(); + event_bus.publish(topic, expected_message).unwrap(); let received = task_handle .await .expect("Failed to receive result from task"); - assert_eq!(received, expected_message); + assert_eq!(&*received, expected_message); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_publish_to_nonexistent_topic() { + let bus = EventBus::new(); + let result = bus.publish("missing_topic", b"ignored"); + // Should not error, just ignored silently + assert!(result.is_ok()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_multiple_subscribers_receive() { + let topic = "multi_subs"; + let bus = EventBus::new(); + + let mut s1 = bus.subscribe(topic); + let mut s2 = bus.subscribe(topic); + + bus.publish(topic, b"msg").unwrap(); + + let r1 = s1.next().await.unwrap(); + let r2 = s2.next().await.unwrap(); + + assert_eq!(&*r1, b"msg"); + assert_eq!(&*r2, b"msg"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_topic_removed_when_no_subscribers() { + let bus = EventBus::new(); + let topic = "temp_topic"; + + { + let mut sub = bus.subscribe(topic); + // Topic exists so publish succeeds + bus.publish(topic, b"hello").unwrap(); + // Assert the message was received + let r1 = sub.next().await.unwrap(); + assert_eq!(&*r1, b"hello"); + // The topic will be cleaned up here + } + + // The topic doesn't exist anymore → publish should silently no-op + let result = bus.publish(topic, b"nobody_listens"); + // No err: behaves like non-existent topic + assert!(result.is_ok()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_capacity_overflow() { + let topic = "overflow_test"; + let bus = EventBus::new_with_topic_capacity(1); + + let mut sub = bus.subscribe(topic); + + // Fill buffer with one message + bus.publish(topic, b"A").unwrap(); + + // Second publish should overflow → Err(CapacityOverflow) + let err = bus.publish(topic, b"B").unwrap_err(); + + matches!(err, PublishError::CapacityOverflow(_)); + + // Drain the first message to help debugging if needed + let _ = sub.next().await; + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_multiple_topics_isolation() { + let bus = EventBus::new(); + + let mut sub_a = bus.subscribe("A"); + let mut sub_b = bus.subscribe("B"); + + bus.publish("A", b"msgA").unwrap(); + bus.publish("A", b"msgC").unwrap(); + bus.publish("B", b"msgB").unwrap(); + bus.publish("B", b"msgD").unwrap(); + + let recv_a = sub_a.next().await.unwrap(); + let recv_c = sub_a.next().await.unwrap(); + let recv_b = sub_b.next().await.unwrap(); + let recv_d = sub_b.next().await.unwrap(); + + assert_eq!(&*recv_a, b"msgA"); + assert_eq!(&*recv_c, b"msgC"); + assert_eq!(&*recv_b, b"msgB"); + assert_eq!(&*recv_d, b"msgD"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn stress_test_concurrent_publishers() { + const TOPIC: &str = "stress_topic"; + const PUBLISHERS: usize = 20; + const MSGS_PER_PUBLISHER: usize = 200; + const TOTAL_MSGS: usize = PUBLISHERS * MSGS_PER_PUBLISHER; + + let bus = EventBus::new(); + let mut sub = bus.subscribe(TOPIC); + + // Deterministic RNG for reproducibility + let mut rng = StdRng::seed_from_u64(12345); + + // Pre-generate all messages and expected results + let messages: Vec> = (0..TOTAL_MSGS) + .map(|_| rng.next_u64().to_le_bytes().into()) + .collect(); + + // Spawn publisher tasks + let handles: Vec<_> = (0..PUBLISHERS) + .map(|id| { + let start = id * MSGS_PER_PUBLISHER; + let end = start + MSGS_PER_PUBLISHER; + let bus = bus.clone(); + let slice = messages[start..end].to_vec(); + + tokio::spawn(async move { + for msg in slice { + bus.publish(TOPIC, &msg).unwrap(); + } + }) + }) + .collect(); + + // Collect all messages in receiver + let mut received = Vec::new(); + + for _ in 0..TOTAL_MSGS { + let msg = sub + .next() + .await + .expect("Channel closed unexpectedly during stress test"); + + received.push(msg.to_vec()); + } + + // Ensure all send tasks complete + for h in handles { + h.await.unwrap(); + } + + // Check we got all messages, sorted by content because async order varies + let mut expected_sorted: Vec<_> = messages.clone().iter().map(|v| v.to_vec()).collect(); + expected_sorted.sort(); + received.sort(); + + assert_eq!( + received, expected_sorted, + "Message mismatch under stress load!" + ); + } + + #[tokio::test(flavor = "multi_thread")] + async fn stress_test_multiple_subscribers() { + const TOPIC: &str = "stress_multi_subs"; + const PUBLISHERS: usize = 10; + const SUBSCRIBERS: usize = 15; + const MSGS_PER_PUBLISHER: usize = 150; + const TOTAL: usize = PUBLISHERS * MSGS_PER_PUBLISHER; + + let bus = EventBus::new(); + + // Deterministic RNG seed + let mut rng = StdRng::seed_from_u64(9999); + + // Random messages corpus + let messages: Vec> = (0..TOTAL) + .map(|_| rng.next_u64().to_le_bytes().into()) + .collect(); + + // Create subscribers + let subs: Vec = (0..SUBSCRIBERS).map(|_| bus.subscribe(TOPIC)).collect(); + + // Spawn publishers + let pub_handles = (0..PUBLISHERS) + .map(|id| { + let start = id * MSGS_PER_PUBLISHER; + let end = start + MSGS_PER_PUBLISHER; + let bus = bus.clone(); + let slice = messages[start..end].to_vec(); + + tokio::spawn(async move { + for msg in slice { + bus.publish(TOPIC, &msg).unwrap(); + // Bring the task randomly back to the runtime + if rand::random::() { + tokio::task::yield_now().await; + } + } + }) + }) + .collect::>(); + + // Spawn subscriber collectors + let sub_handles = subs + .into_iter() + .map(|mut sub| { + tokio::spawn(async move { + let mut collected = Vec::with_capacity(TOTAL); + + for _ in 0..TOTAL { + let msg = sub + .next() + .await + .expect("Channel closed unexpectedly during stress test"); + + collected.push(msg.to_vec()); + } + + collected + }) + }) + .collect::>(); + + // Ensure publishers finish + for h in pub_handles { + h.await.unwrap(); + } + + // Collect all subscriber results + let mut sub_results = Vec::new(); + for h in sub_handles { + sub_results.push(h.await.unwrap()); + } + + // Validation (for each subscriber) + for mut received in sub_results { + // Asynchronous race means ordering is not enforced → sort + received.sort(); + + let mut expected: Vec<_> = messages.clone().into_iter().map(|v| v.to_vec()).collect(); + expected.sort(); + + assert_eq!( + received, expected, + "Subscriber missed or corrupted messages", + ); + } } }