first commit

This commit is contained in:
JasterV 2025-11-27 02:01:49 +01:00
commit 30e498f110
10 changed files with 838 additions and 0 deletions

7
.github/dependabot.yml vendored Normal file
View file

@ -0,0 +1,7 @@
version: 2
updates:
- package-ecosystem: "cargo"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10

46
.github/workflows/ci.yml vendored Normal file
View file

@ -0,0 +1,46 @@
name: CI
on:
push:
branches:
- main
pull_request:
concurrency:
group: ci-${{ github.head_ref || github.ref }}
cancel-in-progress: true
env:
CARGO_TERM_COLOR: always
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- run: rustup default 1.91.1
- run: rustup component add clippy rustfmt
- uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # ratchet:Swatinem/rust-cache@v2
- uses: taiki-e/install-action@ae532dedd825648efd18d9c49c9a443d0398ca0a # ratchet:taiki-e/install-action@cargo-make
- uses: taiki-e/install-action@b98f5bfc2edc235d74c94cb39bd9d8cdd69dbbdf # ratchet:taiki-e/install-action@cargo-deny
- run: cargo make -p ci fmt-check
- run: cargo make -p ci clippy
- run: cargo make -p ci deny-check
- run: cargo make -p ci docs
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- run: rustup default 1.91.1
- uses: Swatinem/rust-cache@f13886b937689c021905a6b90929199931d60db1 # ratchet:Swatinem/rust-cache@v2
- uses: taiki-e/install-action@ae532dedd825648efd18d9c49c9a443d0398ca0a # ratchet:taiki-e/install-action@cargo-make
- run: cargo make -p ci test
alls-green:
if: always()
runs-on: ubuntu-latest
needs:
- lint
- test
steps:
- run: ${{ !contains(needs.*.result, 'failure') }}

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

341
Cargo.lock generated Normal file
View file

@ -0,0 +1,341 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "async-broadcast"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c48ccdbf6ca6b121e0f586cbc0e73ae440e56c67c30fa0873b4e110d9c26d2b"
dependencies = [
"event-listener",
"futures-core",
]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "bitflags"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3"
[[package]]
name = "cfg-if"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801"
[[package]]
name = "crossbeam-utils"
version = "0.8.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "event_bus"
version = "0.1.0"
dependencies = [
"async-broadcast",
"async-stream",
"dashmap",
"futures",
"thiserror",
"tokio",
]
[[package]]
name = "futures"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
[[package]]
name = "futures-task"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-util"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "hashbrown"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "libc"
version = "0.2.177"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2874a2af47a2325c2001a6e6fad9b16a53b802102b528163885171cf92b15976"
[[package]]
name = "lock_api"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965"
dependencies = [
"scopeguard",
]
[[package]]
name = "memchr"
version = "2.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
[[package]]
name = "once_cell"
version = "1.21.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d"
[[package]]
name = "parking_lot_core"
version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-link",
]
[[package]]
name = "pin-project-lite"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro2"
version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
dependencies = [
"unicode-ident",
]
[[package]]
name = "quote"
version = "1.0.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f"
dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.5.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d"
dependencies = [
"bitflags",
]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "slab"
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589"
[[package]]
name = "smallvec"
version = "1.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03"
[[package]]
name = "syn"
version = "2.0.111"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "390cc9a294ab71bdb1aa2e99d13be9c753cd2d7bd6560c77118597410c4d2e87"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "thiserror"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "2.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tokio"
version = "1.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408"
dependencies = [
"pin-project-lite",
"tokio-macros",
]
[[package]]
name = "tokio-macros"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "unicode-ident"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5"
[[package]]
name = "windows-link"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"

14
Cargo.toml Normal file
View file

@ -0,0 +1,14 @@
[package]
name = "event_bus"
version = "0.1.0"
edition = "2024"
[dependencies]
dashmap = "6.1.0"
thiserror = "2.0.17"
async-broadcast = "0.5"
futures = "0.3"
async-stream = "0.3"
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Victor Martínez Montané
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

51
Makefile.toml Normal file
View file

@ -0,0 +1,51 @@
[config]
default_to_workspace = false
[tasks.build]
description = "Build binaries"
command = "cargo"
args = ["build", "--workspace", "--all-features"]
[tasks.clippy]
description = "Runs clippy."
clear = true
command = "cargo"
args = [
"clippy",
"--all-targets",
"--all-features",
"--workspace",
"--",
"-D",
"warnings",
"-W",
"clippy::dbg_macro",
]
[tasks.fmt-check]
description = "Runs the cargo rustfmt plugin during CI."
command = "cargo"
args = ["fmt", "--all", "--", "--check"]
[tasks.deny-check]
description = "Runs the cargo deny plugin during CI."
command = "cargo"
args = ["deny", "check"]
[tasks.test]
description = "Runs tests."
clear = true
run_task = { name = [
"doc-tests",
"nextest",
], fork = true }
[tasks.nextest]
description = "Runs tests without dependencies."
command = "cargo"
args = ["nextest", "run", "--no-fail-fast", "${@}"]
[tasks.doc-tests]
description = "Run doc tests"
command = "cargo"
args = ["test", "--doc"]

84
README.md Normal file
View file

@ -0,0 +1,84 @@
# event_bus.rs
A **runtime-agnostic**, **async**, and **thread-safe** event bus for Rust.
Designed to be **efficient**, **simple**, and **easy to use**, allowing you to publish and subscribe to messages across threads and async tasks.
---
## Features
- **Runtime-agnostic**: works with any async runtime (Tokio, async-std, smol, etc.)
- **Thread-safe**: multiple publishers and subscribers can safely coexist
- **Async & Stream-based**: subscribers implement `futures::Stream`
- **Automatic cleanup**: topics are removed when the last subscriber drops
- **Minimal & simple API**: just `EventBus::subscribe` and `EventBus::publish`
---
## Installation
Add to your `Cargo.toml`:
```toml
[dependencies]
event_bus_rs = "0.1.0"
futures = "0.3"
````
---
## Usage Example
```rust
use event_bus_rs::EventBus;
use futures::StreamExt;
#[tokio::main]
async fn main() {
let bus = EventBus::builder()
.with_topic_capacity(50)
.build();
// Subscribe to a topic
let mut sub = bus.subscribe("my_topic");
// Spawn a subscriber task
tokio::spawn(async move {
while let Some(msg) = sub.next().await {
match msg {
Ok(payload) => {
println!("Received: {}", String::from_utf8_lossy(&payload));
}
Err(err) => {
eprintln!("Error receiving message: {:?}", err);
}
}
}
});
// Publish a message
bus.publish("my_topic", b"Hello, EventBus!").await.unwrap();
}
```
**Notes:**
* Messages are published as `&[u8]`; encoding/decoding is the user's responsibility.
* Multiple subscribers to the same topic each get a copy of every message.
* When all subscribers of a topic are dropped, the topic is automatically cleaned up.
---
## API Overview
* `EventBus::new() -> EventBus` create a new bus
* `EventBus::builder() -> EventBusBuilder` create a new bus builder
* `EventBus::subscribe(&self, topic: &str) -> Subscription` subscribe to a topic
* `EventBus::publish(&self, topic: &str, data: &[u8]) -> Result<(), PublishError>` publish a message
* `Subscription` implements `futures::Stream<Item = Result<Payload, SubscriptionStreamRecvError>>`
---
## License
MIT OR Apache-2.0

3
rust-toolchain.toml Normal file
View file

@ -0,0 +1,3 @@
[toolchain]
channel = "1.91"
components = ["rustfmt", "clippy", "rust-src", "rust-analyzer"]

270
src/lib.rs Normal file
View file

@ -0,0 +1,270 @@
//! Runtime agnostic async implementation of a thread-safe event bus.
//!
//! It provides the following features:
//!
//! - Users can publish messages to a topic
//! - Users can subscribe a topic and listen for incoming events.
//!
//! Messages are published as bytes, it is responsibility of the user to perform the encoding and decoding.
use async_broadcast::{Receiver, Sender, broadcast};
use dashmap::DashMap;
use futures::Stream;
use std::{
pin::Pin,
sync::{
Arc, Weak,
atomic::{AtomicUsize, Ordering},
},
task::{Context, Poll},
};
const DEFAULT_TOPIC_CAPACITY: usize = 1000;
type Payload = Arc<[u8]>;
type Tx = Sender<Payload>;
type Rx = Receiver<Payload>;
/// Represents a single topic.
/// Contains information about how many subscribers it has and the inner broadcast sender & receivers
struct Topic {
subscribers: AtomicUsize,
sender: Tx,
}
/// A map keeping track of the existing topics.
///
/// It contains the logic that understands when to create or delete a topic.
///
/// It makes use of a concurrent map that can be safely shared between threads.
struct TopicMap {
inner: DashMap<String, Topic>,
topic_capacity: usize,
}
impl TopicMap {
fn new(topic_capacity: usize) -> Self {
Self {
inner: DashMap::new(),
topic_capacity,
}
}
fn get_sender(&self, topic_name: &str) -> Option<Tx> {
self.inner.get(topic_name).map(|topic| topic.sender.clone())
}
fn new_subscriber(&self, topic_name: &str) -> Rx {
if let Some(topic) = self.inner.get(topic_name) {
topic
.subscribers
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return topic.sender.new_receiver();
}
let (tx, rx) = broadcast::<Payload>(self.topic_capacity);
let topic = Topic {
subscribers: AtomicUsize::new(1),
sender: tx,
};
self.inner.insert(topic_name.to_string(), topic);
rx
}
fn remove_subscriber(&self, topic_name: &str) {
// Scope the guard so it drops before we call `remove`.
//
// This is done because trying to call `remove` while we hold a read guard
// from the DashMap will result in a deadlock.
let subscribers_count = {
let Some(topic_ref) = self.inner.get(topic_name) else {
return;
};
topic_ref.subscribers.fetch_sub(1, Ordering::Relaxed)
};
if subscribers_count <= 1 {
let _ = self.inner.remove(topic_name);
}
}
}
/// Holds a subscription to a topic.
///
/// `Subscription` implements `Stream`, which means that the user can consume it as a stream
/// to listen for incoming messages.
///
/// Given the nature of the pub-sub architecture and the fact that anyone might be able to publish
/// to any topic at any time, the right to close the channel remains on the subscription side.
///
/// This means, a subscription will always remain open and it won't be closed from the publishers side.
///
/// The internal channel will only be closed when all the subscriptions for a given topic have been dropped.
///
/// When the subscription is dropped, the parent topic might be deallocated from memory if no other subscriptions to it exist.
pub struct Subscription {
topic: Arc<str>,
inner_topics: Weak<TopicMap>,
rx: Rx,
}
impl Stream for Subscription {
type Item = Payload;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
impl Drop for Subscription {
fn drop(&mut self) {
if let Some(topics) = self.inner_topics.upgrade() {
topics.remove_subscriber(&self.topic);
}
}
}
/// Error type returned by the `publish` method.
#[derive(thiserror::Error, Debug)]
pub enum PublishError {
#[error("Failed to publish message to topic, it was unexpectedly closed: '{0}'")]
ChannelClosed(Arc<str>),
#[error(
"Failed to publish message to topic, the topic is full and can't handle more messages: '{0}'"
)]
CapacityOverflow(Arc<str>),
}
/// EventBus builder.
///
/// It is used to configure the event bus before building it.
pub struct EventBusBuilder {
/// Topics are bounded to a capacity, if the capacity is overflown, messages can't be published.
topic_capacity: usize,
}
impl EventBusBuilder {
fn new() -> Self {
Self {
topic_capacity: DEFAULT_TOPIC_CAPACITY,
}
}
pub fn with_topic_capacity(self, topic_capacity: usize) -> Self {
Self { topic_capacity }
}
pub fn build(self) -> EventBus {
EventBus {
topics: Arc::new(TopicMap::new(self.topic_capacity)),
}
}
}
/// A thread-safe event bus.
///
/// Users can subscribe to topics and publish to them.
///
/// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory.
#[derive(Clone)]
pub struct EventBus {
topics: Arc<TopicMap>,
}
impl Default for EventBus {
fn default() -> Self {
EventBus::new()
}
}
impl EventBus {
pub fn new() -> Self {
EventBusBuilder::new().build()
}
pub fn builder() -> EventBusBuilder {
EventBusBuilder::new()
}
/// Subscribes to a topic and returns a `Subscription`.
///
/// This operation will never fail, if the topic doesn't exist it gets internally created.
/// Once the subscription goes out of scope and there are no more references to the given topic,
/// the topic will automatically be dropped from memory.
pub fn subscribe(&self, topic: &str) -> Subscription {
let rx = self.topics.new_subscriber(topic);
Subscription {
topic: topic.into(),
inner_topics: Arc::downgrade(&self.topics),
rx,
}
}
/// Publishes a bunch of bytes to a topic.
///
/// If the topic doesn't exist, nothing will happen and it won't be considered an error.
/// This method will only fail if, in case a topic with the given name exists, the internal `send` operation fails.
pub fn publish(&self, topic: &str, data: &[u8]) -> Result<(), PublishError> {
let Some(sx) = self.topics.get_sender(topic) else {
return Ok(());
};
let result = sx.try_broadcast(Arc::from(data));
match result {
Ok(_) => Ok(()),
// There are no active receivers, we do not consider this an error
Err(async_broadcast::TrySendError::Inactive(_)) => Ok(()),
// The channel is closed, we return an error as this is unexpected
Err(async_broadcast::TrySendError::Closed(_)) => {
Err(PublishError::ChannelClosed(topic.into()))
}
// The channel is overflown, we return an error
Err(async_broadcast::TrySendError::Full(_)) => {
Err(PublishError::CapacityOverflow(topic.into()))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
// Helper function to safely extract a message from a stream
async fn get_next_message(sub: &mut Subscription) -> String {
let payload = sub.next().await.expect("Stream unexpectedly closed");
String::from_utf8_lossy(&payload).to_string()
}
#[tokio::test(flavor = "multi_thread")]
async fn test_simple_pub_sub() {
let event_bus = EventBus::new();
let topic = "test_simple";
let expected_message = "Hello EventBus";
let mut subscription = event_bus.subscribe(topic);
let task_handle = tokio::spawn(async move { get_next_message(&mut subscription).await });
event_bus
.publish(topic, expected_message.as_bytes())
.unwrap();
let received = task_handle
.await
.expect("Failed to receive result from task");
assert_eq!(received, expected_message);
}
}