refactor lib

This commit is contained in:
JasterV 2025-11-27 18:17:40 +01:00
parent e59dffe74b
commit 11b389a5ec
3 changed files with 69 additions and 100 deletions

View file

@ -1,15 +1,21 @@
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
mod subscription;
mod topic;
use crate::subscription::Subscription;
use async_broadcast::{Sender, broadcast};
use dashmap::DashMap;
use std::sync::{Arc, atomic::AtomicUsize};
use std::sync::Arc;
use topic::TopicMap;
// Re-exports
pub use subscription::Subscription;
mod subscription;
const DEFAULT_TOPIC_CAPACITY: usize = 1000;
/// Represents a single topic.
/// Contains information about how many subscribers it has and the inner broadcast sender & receivers
pub struct Topic {
subscribers: AtomicUsize,
sender: Sender<Arc<[u8]>>,
}
/// Error type returned by the `publish` method.
#[derive(thiserror::Error, Debug)]
pub enum PublishError {
@ -42,7 +48,8 @@ impl EventBusBuilder {
pub fn build(self) -> EventBus {
EventBus {
topics: Arc::new(TopicMap::new(self.topic_capacity)),
inner: Arc::new(DashMap::new()),
topic_capacity: self.topic_capacity,
}
}
}
@ -54,18 +61,19 @@ impl EventBusBuilder {
/// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory.
#[derive(Clone)]
pub struct EventBus {
topics: Arc<TopicMap>,
inner: Arc<DashMap<String, Topic>>,
topic_capacity: usize,
}
impl Default for EventBus {
fn default() -> Self {
EventBus::new()
EventBus::builder().build()
}
}
impl EventBus {
pub fn new() -> Self {
EventBusBuilder::new().build()
pub fn new() -> EventBus {
Self::default()
}
pub fn builder() -> EventBusBuilder {
@ -77,12 +85,31 @@ impl EventBus {
/// This operation will never fail, if the topic doesn't exist it gets internally created.
/// Once the subscription goes out of scope and there are no more references to the given topic,
/// the topic will automatically be dropped from memory.
pub fn subscribe(&self, topic: &str) -> Subscription {
let rx = self.topics.new_subscriber(topic);
pub fn subscribe(&self, topic_name: &str) -> Subscription {
if let Some(topic) = self.inner.get(topic_name) {
topic
.subscribers
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Subscription {
topic: topic_name.into(),
topics_ref: Arc::downgrade(&self.inner),
rx: topic.sender.new_receiver(),
};
}
let (tx, rx) = broadcast::<Arc<[u8]>>(self.topic_capacity);
let topic = Topic {
subscribers: AtomicUsize::new(1),
sender: tx,
};
self.inner.insert(topic_name.to_string(), topic);
Subscription {
topic: topic.into(),
inner_topics: Arc::downgrade(&self.topics),
topic: topic_name.into(),
topics_ref: Arc::downgrade(&self.inner),
rx,
}
}
@ -91,12 +118,12 @@ impl EventBus {
///
/// If the topic doesn't exist, nothing will happen and it won't be considered an error.
/// This method will only fail if, in case a topic with the given name exists, the internal `send` operation fails.
pub fn publish(&self, topic: &str, data: &[u8]) -> Result<(), PublishError> {
let Some(sx) = self.topics.get_sender(topic) else {
pub fn publish(&self, topic_name: &str, data: &[u8]) -> Result<(), PublishError> {
let Some(topic) = self.inner.get(topic_name) else {
return Ok(());
};
let result = sx.try_broadcast(Arc::from(data));
let result = topic.sender.try_broadcast(Arc::from(data));
match result {
Ok(_) => Ok(()),
@ -104,11 +131,11 @@ impl EventBus {
Err(async_broadcast::TrySendError::Inactive(_)) => Ok(()),
// The channel is closed, we return an error as this is unexpected
Err(async_broadcast::TrySendError::Closed(_)) => {
Err(PublishError::ChannelClosed(topic.into()))
Err(PublishError::ChannelClosed(topic_name.into()))
}
// The channel is overflown, we return an error
Err(async_broadcast::TrySendError::Full(_)) => {
Err(PublishError::CapacityOverflow(topic.into()))
Err(PublishError::CapacityOverflow(topic_name.into()))
}
}
}

View file

@ -1,9 +1,10 @@
use crate::topic::TopicMap;
use crate::Topic;
use async_broadcast::Receiver;
use dashmap::DashMap;
use futures::Stream;
use std::{
pin::Pin,
sync::{Arc, Weak},
sync::{Arc, Weak, atomic::Ordering},
task::{Context, Poll},
};
@ -22,7 +23,7 @@ use std::{
/// When the subscription is dropped, the parent topic might be deallocated from memory if no other subscriptions to it exist.
pub struct Subscription {
pub(crate) topic: Arc<str>,
pub(crate) inner_topics: Weak<TopicMap>,
pub(crate) topics_ref: Weak<DashMap<String, Topic>>,
pub(crate) rx: Receiver<Arc<[u8]>>,
}
@ -40,8 +41,24 @@ impl Stream for Subscription {
impl Drop for Subscription {
fn drop(&mut self) {
if let Some(topics) = self.inner_topics.upgrade() {
topics.remove_subscriber(&self.topic);
let Some(topics) = self.topics_ref.upgrade() else {
return;
};
// Scope the guard so it drops before we call `remove`.
//
// This is done because trying to call `remove` while we hold a read guard
// from the DashMap will result in a deadlock.
let subscribers_count = {
let Some(topic_ref) = topics.get(self.topic.as_ref()) else {
return;
};
topic_ref.subscribers.fetch_sub(1, Ordering::Relaxed)
};
if subscribers_count <= 1 {
let _ = topics.remove(self.topic.as_ref());
}
}
}

View file

@ -1,75 +0,0 @@
use async_broadcast::{Receiver, Sender, broadcast};
use dashmap::DashMap;
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
/// Represents a single topic.
/// Contains information about how many subscribers it has and the inner broadcast sender & receivers
pub(crate) struct Topic {
subscribers: AtomicUsize,
sender: Sender<Arc<[u8]>>,
}
/// A map keeping track of the existing topics.
///
/// It contains the logic that understands when to create or delete a topic.
///
/// It makes use of a concurrent map that can be safely shared between threads.
pub(crate) struct TopicMap {
inner: DashMap<String, Topic>,
topic_capacity: usize,
}
impl TopicMap {
pub(crate) fn new(topic_capacity: usize) -> Self {
Self {
inner: DashMap::new(),
topic_capacity,
}
}
pub(crate) fn get_sender(&self, topic_name: &str) -> Option<Sender<Arc<[u8]>>> {
self.inner.get(topic_name).map(|topic| topic.sender.clone())
}
pub(crate) fn new_subscriber(&self, topic_name: &str) -> Receiver<Arc<[u8]>> {
if let Some(topic) = self.inner.get(topic_name) {
topic
.subscribers
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return topic.sender.new_receiver();
}
let (tx, rx) = broadcast::<Arc<[u8]>>(self.topic_capacity);
let topic = Topic {
subscribers: AtomicUsize::new(1),
sender: tx,
};
self.inner.insert(topic_name.to_string(), topic);
rx
}
pub(crate) fn remove_subscriber(&self, topic_name: &str) {
// Scope the guard so it drops before we call `remove`.
//
// This is done because trying to call `remove` while we hold a read guard
// from the DashMap will result in a deadlock.
let subscribers_count = {
let Some(topic_ref) = self.inner.get(topic_name) else {
return;
};
topic_ref.subscribers.fetch_sub(1, Ordering::Relaxed)
};
if subscribers_count <= 1 {
let _ = self.inner.remove(topic_name);
}
}
}