mirror of
https://codeberg.org/JasterV/event_bus.rs.git
synced 2026-04-26 18:10:02 +00:00
refactor lib & update README
This commit is contained in:
parent
3f9c42e7d5
commit
e59dffe74b
6 changed files with 135 additions and 142 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
|
@ -99,7 +99,7 @@ dependencies = [
|
|||
]
|
||||
|
||||
[[package]]
|
||||
name = "event_bus"
|
||||
name = "event_bus_rs"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-broadcast",
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
[package]
|
||||
name = "event_bus"
|
||||
name = "event_bus_rs"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
|
|
|
|||
11
README.md
11
README.md
|
|
@ -44,19 +44,12 @@ async fn main() {
|
|||
// Spawn a subscriber task
|
||||
tokio::spawn(async move {
|
||||
while let Some(msg) = sub.next().await {
|
||||
match msg {
|
||||
Ok(payload) => {
|
||||
println!("Received: {}", String::from_utf8_lossy(&payload));
|
||||
}
|
||||
Err(err) => {
|
||||
eprintln!("Error receiving message: {:?}", err);
|
||||
}
|
||||
}
|
||||
println!("Received: {}", String::from_utf8_lossy(&msg));
|
||||
}
|
||||
});
|
||||
|
||||
// Publish a message
|
||||
bus.publish("my_topic", b"Hello, EventBus!").await.unwrap();
|
||||
bus.publish("my_topic", b"Hello, EventBus!").unwrap();
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
|||
140
src/lib.rs
140
src/lib.rs
|
|
@ -1,137 +1,15 @@
|
|||
//! Runtime agnostic async implementation of a thread-safe event bus.
|
||||
//!
|
||||
//! It provides the following features:
|
||||
//!
|
||||
//! - Users can publish messages to a topic
|
||||
//! - Users can subscribe a topic and listen for incoming events.
|
||||
//!
|
||||
//! Messages are published as bytes, it is responsibility of the user to perform the encoding and decoding.
|
||||
use async_broadcast::{Receiver, Sender, broadcast};
|
||||
use dashmap::DashMap;
|
||||
use futures::Stream;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::{
|
||||
Arc, Weak,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
|
||||
|
||||
mod subscription;
|
||||
mod topic;
|
||||
|
||||
use std::sync::Arc;
|
||||
use topic::TopicMap;
|
||||
// Re-exports
|
||||
pub use subscription::Subscription;
|
||||
|
||||
const DEFAULT_TOPIC_CAPACITY: usize = 1000;
|
||||
|
||||
type Payload = Arc<[u8]>;
|
||||
type Tx = Sender<Payload>;
|
||||
type Rx = Receiver<Payload>;
|
||||
|
||||
/// Represents a single topic.
|
||||
/// Contains information about how many subscribers it has and the inner broadcast sender & receivers
|
||||
struct Topic {
|
||||
subscribers: AtomicUsize,
|
||||
sender: Tx,
|
||||
}
|
||||
|
||||
/// A map keeping track of the existing topics.
|
||||
///
|
||||
/// It contains the logic that understands when to create or delete a topic.
|
||||
///
|
||||
/// It makes use of a concurrent map that can be safely shared between threads.
|
||||
struct TopicMap {
|
||||
inner: DashMap<String, Topic>,
|
||||
topic_capacity: usize,
|
||||
}
|
||||
|
||||
impl TopicMap {
|
||||
fn new(topic_capacity: usize) -> Self {
|
||||
Self {
|
||||
inner: DashMap::new(),
|
||||
topic_capacity,
|
||||
}
|
||||
}
|
||||
|
||||
fn get_sender(&self, topic_name: &str) -> Option<Tx> {
|
||||
self.inner.get(topic_name).map(|topic| topic.sender.clone())
|
||||
}
|
||||
|
||||
fn new_subscriber(&self, topic_name: &str) -> Rx {
|
||||
if let Some(topic) = self.inner.get(topic_name) {
|
||||
topic
|
||||
.subscribers
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
return topic.sender.new_receiver();
|
||||
}
|
||||
|
||||
let (tx, rx) = broadcast::<Payload>(self.topic_capacity);
|
||||
|
||||
let topic = Topic {
|
||||
subscribers: AtomicUsize::new(1),
|
||||
sender: tx,
|
||||
};
|
||||
|
||||
self.inner.insert(topic_name.to_string(), topic);
|
||||
|
||||
rx
|
||||
}
|
||||
|
||||
fn remove_subscriber(&self, topic_name: &str) {
|
||||
// Scope the guard so it drops before we call `remove`.
|
||||
//
|
||||
// This is done because trying to call `remove` while we hold a read guard
|
||||
// from the DashMap will result in a deadlock.
|
||||
let subscribers_count = {
|
||||
let Some(topic_ref) = self.inner.get(topic_name) else {
|
||||
return;
|
||||
};
|
||||
|
||||
topic_ref.subscribers.fetch_sub(1, Ordering::Relaxed)
|
||||
};
|
||||
|
||||
if subscribers_count <= 1 {
|
||||
let _ = self.inner.remove(topic_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds a subscription to a topic.
|
||||
///
|
||||
/// `Subscription` implements `Stream`, which means that the user can consume it as a stream
|
||||
/// to listen for incoming messages.
|
||||
///
|
||||
/// Given the nature of the pub-sub architecture and the fact that anyone might be able to publish
|
||||
/// to any topic at any time, the right to close the channel remains on the subscription side.
|
||||
///
|
||||
/// This means, a subscription will always remain open and it won't be closed from the publishers side.
|
||||
///
|
||||
/// The internal channel will only be closed when all the subscriptions for a given topic have been dropped.
|
||||
///
|
||||
/// When the subscription is dropped, the parent topic might be deallocated from memory if no other subscriptions to it exist.
|
||||
pub struct Subscription {
|
||||
topic: Arc<str>,
|
||||
inner_topics: Weak<TopicMap>,
|
||||
rx: Rx,
|
||||
}
|
||||
|
||||
impl Stream for Subscription {
|
||||
type Item = Payload;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match Pin::new(&mut self.rx).poll_next(cx) {
|
||||
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Subscription {
|
||||
fn drop(&mut self) {
|
||||
if let Some(topics) = self.inner_topics.upgrade() {
|
||||
topics.remove_subscriber(&self.topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Error type returned by the `publish` method.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum PublishError {
|
||||
|
|
|
|||
47
src/subscription.rs
Normal file
47
src/subscription.rs
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
use crate::topic::TopicMap;
|
||||
use async_broadcast::Receiver;
|
||||
use futures::Stream;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::{Arc, Weak},
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// Holds a subscription to a topic.
|
||||
///
|
||||
/// `Subscription` implements `Stream`, which means that the user can consume it as a stream
|
||||
/// to listen for incoming messages.
|
||||
///
|
||||
/// Given the nature of the pub-sub architecture and the fact that anyone might be able to publish
|
||||
/// to any topic at any time, the right to close the channel remains on the subscription side.
|
||||
///
|
||||
/// This means, a subscription will always remain open and it won't be closed from the publishers side.
|
||||
///
|
||||
/// The internal channel will only be closed when all the subscriptions for a given topic have been dropped.
|
||||
///
|
||||
/// When the subscription is dropped, the parent topic might be deallocated from memory if no other subscriptions to it exist.
|
||||
pub struct Subscription {
|
||||
pub(crate) topic: Arc<str>,
|
||||
pub(crate) inner_topics: Weak<TopicMap>,
|
||||
pub(crate) rx: Receiver<Arc<[u8]>>,
|
||||
}
|
||||
|
||||
impl Stream for Subscription {
|
||||
type Item = Arc<[u8]>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
match Pin::new(&mut self.rx).poll_next(cx) {
|
||||
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Subscription {
|
||||
fn drop(&mut self) {
|
||||
if let Some(topics) = self.inner_topics.upgrade() {
|
||||
topics.remove_subscriber(&self.topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
75
src/topic.rs
Normal file
75
src/topic.rs
Normal file
|
|
@ -0,0 +1,75 @@
|
|||
use async_broadcast::{Receiver, Sender, broadcast};
|
||||
use dashmap::DashMap;
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
/// Represents a single topic.
|
||||
/// Contains information about how many subscribers it has and the inner broadcast sender & receivers
|
||||
pub(crate) struct Topic {
|
||||
subscribers: AtomicUsize,
|
||||
sender: Sender<Arc<[u8]>>,
|
||||
}
|
||||
|
||||
/// A map keeping track of the existing topics.
|
||||
///
|
||||
/// It contains the logic that understands when to create or delete a topic.
|
||||
///
|
||||
/// It makes use of a concurrent map that can be safely shared between threads.
|
||||
pub(crate) struct TopicMap {
|
||||
inner: DashMap<String, Topic>,
|
||||
topic_capacity: usize,
|
||||
}
|
||||
|
||||
impl TopicMap {
|
||||
pub(crate) fn new(topic_capacity: usize) -> Self {
|
||||
Self {
|
||||
inner: DashMap::new(),
|
||||
topic_capacity,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_sender(&self, topic_name: &str) -> Option<Sender<Arc<[u8]>>> {
|
||||
self.inner.get(topic_name).map(|topic| topic.sender.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn new_subscriber(&self, topic_name: &str) -> Receiver<Arc<[u8]>> {
|
||||
if let Some(topic) = self.inner.get(topic_name) {
|
||||
topic
|
||||
.subscribers
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
return topic.sender.new_receiver();
|
||||
}
|
||||
|
||||
let (tx, rx) = broadcast::<Arc<[u8]>>(self.topic_capacity);
|
||||
|
||||
let topic = Topic {
|
||||
subscribers: AtomicUsize::new(1),
|
||||
sender: tx,
|
||||
};
|
||||
|
||||
self.inner.insert(topic_name.to_string(), topic);
|
||||
|
||||
rx
|
||||
}
|
||||
|
||||
pub(crate) fn remove_subscriber(&self, topic_name: &str) {
|
||||
// Scope the guard so it drops before we call `remove`.
|
||||
//
|
||||
// This is done because trying to call `remove` while we hold a read guard
|
||||
// from the DashMap will result in a deadlock.
|
||||
let subscribers_count = {
|
||||
let Some(topic_ref) = self.inner.get(topic_name) else {
|
||||
return;
|
||||
};
|
||||
|
||||
topic_ref.subscribers.fetch_sub(1, Ordering::Relaxed)
|
||||
};
|
||||
|
||||
if subscribers_count <= 1 {
|
||||
let _ = self.inner.remove(topic_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue