From 30e498f110eb1d0ad2e0daea6dd7f21e1008f545 Mon Sep 17 00:00:00 2001 From: JasterV <49537445+JasterV@users.noreply.github.com> Date: Thu, 27 Nov 2025 02:01:49 +0100 Subject: [PATCH] first commit --- .github/dependabot.yml | 7 + .github/workflows/ci.yml | 46 ++++++ .gitignore | 1 + Cargo.lock | 341 +++++++++++++++++++++++++++++++++++++++ Cargo.toml | 14 ++ LICENSE | 21 +++ Makefile.toml | 51 ++++++ README.md | 84 ++++++++++ rust-toolchain.toml | 3 + src/lib.rs | 270 +++++++++++++++++++++++++++++++ 10 files changed, 838 insertions(+) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/ci.yml create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 Makefile.toml create mode 100644 README.md create mode 100644 rust-toolchain.toml create mode 100644 src/lib.rs diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..e5e3518 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +version: 2 +updates: + - package-ecosystem: "cargo" + directory: "/" + schedule: + interval: "weekly" + open-pull-requests-limit: 10 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..064a92b --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,46 @@ +name: CI + +on: + push: + branches: + - main + pull_request: + +concurrency: + group: ci-${{ github.head_ref || github.ref }} + cancel-in-progress: true + +env: + CARGO_TERM_COLOR: always + +jobs: + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - run: rustup default 1.91.1 + - run: rustup component add clippy rustfmt + - uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # ratchet:Swatinem/rust-cache@v2 + - uses: taiki-e/install-action@ae532dedd825648efd18d9c49c9a443d0398ca0a # ratchet:taiki-e/install-action@cargo-make + - uses: taiki-e/install-action@b98f5bfc2edc235d74c94cb39bd9d8cdd69dbbdf # ratchet:taiki-e/install-action@cargo-deny + - run: cargo make -p ci fmt-check + - run: cargo make -p ci clippy + - run: cargo make -p ci deny-check + - run: cargo make -p ci docs + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + - run: rustup default 1.91.1 + - uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # ratchet:Swatinem/rust-cache@v2 + - uses: taiki-e/install-action@ae532dedd825648efd18d9c49c9a443d0398ca0a # ratchet:taiki-e/install-action@cargo-make + - run: cargo make -p ci test + + alls-green: + if: always() + runs-on: ubuntu-latest + needs: + - lint + - test + steps: + - run: ${{ !contains(needs.*.result, 'failure') }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..4a0a061 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,341 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "async-broadcast" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c48ccdbf6ca6b121e0f586cbc0e73ae440e56c67c30fa0873b4e110d9c26d2b" +dependencies = [ + "event-listener", + "futures-core", +] + +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "bitflags" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + +[[package]] +name = "event_bus" +version = "0.1.0" +dependencies = [ + "async-broadcast", + "async-stream", + "dashmap", + "futures", + "thiserror", + "tokio", +] + +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "libc" +version = "0.2.177" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976" + +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + +[[package]] +name = "memchr" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro2" +version = "1.0.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "syn" +version = "2.0.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio" +version = "1.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +dependencies = [ + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..e5da9bb --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "event_bus" +version = "0.1.0" +edition = "2024" + +[dependencies] +dashmap = "6.1.0" +thiserror = "2.0.17" +async-broadcast = "0.5" +futures = "0.3" +async-stream = "0.3" + +[dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..755eaad --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Victor Martínez Montané + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile.toml b/Makefile.toml new file mode 100644 index 0000000..675f56d --- /dev/null +++ b/Makefile.toml @@ -0,0 +1,51 @@ +[config] +default_to_workspace = false + +[tasks.build] +description = "Build binaries" +command = "cargo" +args = ["build", "--workspace", "--all-features"] + +[tasks.clippy] +description = "Runs clippy." +clear = true +command = "cargo" +args = [ + "clippy", + "--all-targets", + "--all-features", + "--workspace", + "--", + "-D", + "warnings", + "-W", + "clippy::dbg_macro", +] + +[tasks.fmt-check] +description = "Runs the cargo rustfmt plugin during CI." +command = "cargo" +args = ["fmt", "--all", "--", "--check"] + +[tasks.deny-check] +description = "Runs the cargo deny plugin during CI." +command = "cargo" +args = ["deny", "check"] + +[tasks.test] +description = "Runs tests." +clear = true +run_task = { name = [ + "doc-tests", + "nextest", +], fork = true } + +[tasks.nextest] +description = "Runs tests without dependencies." +command = "cargo" +args = ["nextest", "run", "--no-fail-fast", "${@}"] + +[tasks.doc-tests] +description = "Run doc tests" +command = "cargo" +args = ["test", "--doc"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..6931df5 --- /dev/null +++ b/README.md @@ -0,0 +1,84 @@ +# event_bus.rs + +A **runtime-agnostic**, **async**, and **thread-safe** event bus for Rust. +Designed to be **efficient**, **simple**, and **easy to use**, allowing you to publish and subscribe to messages across threads and async tasks. + +--- + +## Features + +- **Runtime-agnostic**: works with any async runtime (Tokio, async-std, smol, etc.) +- **Thread-safe**: multiple publishers and subscribers can safely coexist +- **Async & Stream-based**: subscribers implement `futures::Stream` +- **Automatic cleanup**: topics are removed when the last subscriber drops +- **Minimal & simple API**: just `EventBus::subscribe` and `EventBus::publish` + +--- + +## Installation + +Add to your `Cargo.toml`: + +```toml +[dependencies] +event_bus_rs = "0.1.0" +futures = "0.3" +```` + +--- +## Usage Example + +```rust +use event_bus_rs::EventBus; +use futures::StreamExt; + +#[tokio::main] +async fn main() { + let bus = EventBus::builder() + .with_topic_capacity(50) + .build(); + + // Subscribe to a topic + let mut sub = bus.subscribe("my_topic"); + + // Spawn a subscriber task + tokio::spawn(async move { + while let Some(msg) = sub.next().await { + match msg { + Ok(payload) => { + println!("Received: {}", String::from_utf8_lossy(&payload)); + } + Err(err) => { + eprintln!("Error receiving message: {:?}", err); + } + } + } + }); + + // Publish a message + bus.publish("my_topic", b"Hello, EventBus!").await.unwrap(); +} +``` + +**Notes:** + +* Messages are published as `&[u8]`; encoding/decoding is the user's responsibility. +* Multiple subscribers to the same topic each get a copy of every message. +* When all subscribers of a topic are dropped, the topic is automatically cleaned up. + +--- + +## API Overview + +* `EventBus::new() -> EventBus` – create a new bus +* `EventBus::builder() -> EventBusBuilder` – create a new bus builder +* `EventBus::subscribe(&self, topic: &str) -> Subscription` – subscribe to a topic +* `EventBus::publish(&self, topic: &str, data: &[u8]) -> Result<(), PublishError>` – publish a message +* `Subscription` implements `futures::Stream>` + +--- + +## License + +MIT OR Apache-2.0 + diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..2767a11 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,3 @@ +[toolchain] +channel = "1.91" +components = ["rustfmt", "clippy", "rust-src", "rust-analyzer"] diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3b024ab --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,270 @@ +//! Runtime agnostic async implementation of a thread-safe event bus. +//! +//! It provides the following features: +//! +//! - Users can publish messages to a topic +//! - Users can subscribe a topic and listen for incoming events. +//! +//! Messages are published as bytes, it is responsibility of the user to perform the encoding and decoding. +use async_broadcast::{Receiver, Sender, broadcast}; +use dashmap::DashMap; +use futures::Stream; +use std::{ + pin::Pin, + sync::{ + Arc, Weak, + atomic::{AtomicUsize, Ordering}, + }, + task::{Context, Poll}, +}; + +const DEFAULT_TOPIC_CAPACITY: usize = 1000; + +type Payload = Arc<[u8]>; +type Tx = Sender; +type Rx = Receiver; + +/// Represents a single topic. +/// Contains information about how many subscribers it has and the inner broadcast sender & receivers +struct Topic { + subscribers: AtomicUsize, + sender: Tx, +} + +/// A map keeping track of the existing topics. +/// +/// It contains the logic that understands when to create or delete a topic. +/// +/// It makes use of a concurrent map that can be safely shared between threads. +struct TopicMap { + inner: DashMap, + topic_capacity: usize, +} + +impl TopicMap { + fn new(topic_capacity: usize) -> Self { + Self { + inner: DashMap::new(), + topic_capacity, + } + } + + fn get_sender(&self, topic_name: &str) -> Option { + self.inner.get(topic_name).map(|topic| topic.sender.clone()) + } + + fn new_subscriber(&self, topic_name: &str) -> Rx { + if let Some(topic) = self.inner.get(topic_name) { + topic + .subscribers + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + return topic.sender.new_receiver(); + } + + let (tx, rx) = broadcast::(self.topic_capacity); + + let topic = Topic { + subscribers: AtomicUsize::new(1), + sender: tx, + }; + + self.inner.insert(topic_name.to_string(), topic); + + rx + } + + fn remove_subscriber(&self, topic_name: &str) { + // Scope the guard so it drops before we call `remove`. + // + // This is done because trying to call `remove` while we hold a read guard + // from the DashMap will result in a deadlock. + let subscribers_count = { + let Some(topic_ref) = self.inner.get(topic_name) else { + return; + }; + + topic_ref.subscribers.fetch_sub(1, Ordering::Relaxed) + }; + + if subscribers_count <= 1 { + let _ = self.inner.remove(topic_name); + } + } +} + +/// Holds a subscription to a topic. +/// +/// `Subscription` implements `Stream`, which means that the user can consume it as a stream +/// to listen for incoming messages. +/// +/// Given the nature of the pub-sub architecture and the fact that anyone might be able to publish +/// to any topic at any time, the right to close the channel remains on the subscription side. +/// +/// This means, a subscription will always remain open and it won't be closed from the publishers side. +/// +/// The internal channel will only be closed when all the subscriptions for a given topic have been dropped. +/// +/// When the subscription is dropped, the parent topic might be deallocated from memory if no other subscriptions to it exist. +pub struct Subscription { + topic: Arc, + inner_topics: Weak, + rx: Rx, +} + +impl Stream for Subscription { + type Item = Payload; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.rx).poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl Drop for Subscription { + fn drop(&mut self) { + if let Some(topics) = self.inner_topics.upgrade() { + topics.remove_subscriber(&self.topic); + } + } +} + +/// Error type returned by the `publish` method. +#[derive(thiserror::Error, Debug)] +pub enum PublishError { + #[error("Failed to publish message to topic, it was unexpectedly closed: '{0}'")] + ChannelClosed(Arc), + #[error( + "Failed to publish message to topic, the topic is full and can't handle more messages: '{0}'" + )] + CapacityOverflow(Arc), +} + +/// EventBus builder. +/// +/// It is used to configure the event bus before building it. +pub struct EventBusBuilder { + /// Topics are bounded to a capacity, if the capacity is overflown, messages can't be published. + topic_capacity: usize, +} + +impl EventBusBuilder { + fn new() -> Self { + Self { + topic_capacity: DEFAULT_TOPIC_CAPACITY, + } + } + + pub fn with_topic_capacity(self, topic_capacity: usize) -> Self { + Self { topic_capacity } + } + + pub fn build(self) -> EventBus { + EventBus { + topics: Arc::new(TopicMap::new(self.topic_capacity)), + } + } +} + +/// A thread-safe event bus. +/// +/// Users can subscribe to topics and publish to them. +/// +/// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory. +#[derive(Clone)] +pub struct EventBus { + topics: Arc, +} + +impl Default for EventBus { + fn default() -> Self { + EventBus::new() + } +} + +impl EventBus { + pub fn new() -> Self { + EventBusBuilder::new().build() + } + + pub fn builder() -> EventBusBuilder { + EventBusBuilder::new() + } + + /// Subscribes to a topic and returns a `Subscription`. + /// + /// This operation will never fail, if the topic doesn't exist it gets internally created. + /// Once the subscription goes out of scope and there are no more references to the given topic, + /// the topic will automatically be dropped from memory. + pub fn subscribe(&self, topic: &str) -> Subscription { + let rx = self.topics.new_subscriber(topic); + + Subscription { + topic: topic.into(), + inner_topics: Arc::downgrade(&self.topics), + rx, + } + } + + /// Publishes a bunch of bytes to a topic. + /// + /// If the topic doesn't exist, nothing will happen and it won't be considered an error. + /// This method will only fail if, in case a topic with the given name exists, the internal `send` operation fails. + pub fn publish(&self, topic: &str, data: &[u8]) -> Result<(), PublishError> { + let Some(sx) = self.topics.get_sender(topic) else { + return Ok(()); + }; + + let result = sx.try_broadcast(Arc::from(data)); + + match result { + Ok(_) => Ok(()), + // There are no active receivers, we do not consider this an error + Err(async_broadcast::TrySendError::Inactive(_)) => Ok(()), + // The channel is closed, we return an error as this is unexpected + Err(async_broadcast::TrySendError::Closed(_)) => { + Err(PublishError::ChannelClosed(topic.into())) + } + // The channel is overflown, we return an error + Err(async_broadcast::TrySendError::Full(_)) => { + Err(PublishError::CapacityOverflow(topic.into())) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + + // Helper function to safely extract a message from a stream + async fn get_next_message(sub: &mut Subscription) -> String { + let payload = sub.next().await.expect("Stream unexpectedly closed"); + String::from_utf8_lossy(&payload).to_string() + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_simple_pub_sub() { + let event_bus = EventBus::new(); + let topic = "test_simple"; + let expected_message = "Hello EventBus"; + + let mut subscription = event_bus.subscribe(topic); + + let task_handle = tokio::spawn(async move { get_next_message(&mut subscription).await }); + + event_bus + .publish(topic, expected_message.as_bytes()) + .unwrap(); + + let received = task_handle + .await + .expect("Failed to receive result from task"); + + assert_eq!(received, expected_message); + } +}