mirror of
https://codeberg.org/JasterV/event_bus.rs.git
synced 2026-04-26 18:10:02 +00:00
Merge pull request #2 from JasterV/rcmap
refactor: create and RcMap abstraction
This commit is contained in:
commit
087383a598
4 changed files with 172 additions and 107 deletions
|
|
@ -34,9 +34,7 @@ use futures::StreamExt;
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let bus = EventBus::builder()
|
||||
.with_topic_capacity(50)
|
||||
.build();
|
||||
let bus = EventBus::new_with_topic_capacity(50);
|
||||
|
||||
// Subscribe to a topic
|
||||
let mut sub = bus.subscribe("my_topic");
|
||||
|
|
|
|||
110
src/lib.rs
110
src/lib.rs
|
|
@ -1,20 +1,21 @@
|
|||
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
|
||||
|
||||
use crate::subscription::Subscription;
|
||||
use async_broadcast::{Sender, broadcast};
|
||||
use dashmap::DashMap;
|
||||
use std::sync::{Arc, atomic::AtomicUsize};
|
||||
|
||||
mod subscription;
|
||||
use crate::{rc_map::RcMap, subscription::Subscription};
|
||||
use async_broadcast::{Receiver, Sender, broadcast};
|
||||
use std::sync::Arc;
|
||||
|
||||
const DEFAULT_TOPIC_CAPACITY: usize = 1000;
|
||||
|
||||
/// Represents a single topic.
|
||||
/// Contains information about how many subscribers it has and the inner broadcast sender & receivers
|
||||
pub struct Topic {
|
||||
subscribers: AtomicUsize,
|
||||
sender: Sender<Arc<[u8]>>,
|
||||
}
|
||||
mod rc_map;
|
||||
mod subscription;
|
||||
|
||||
// Wrapper around a sender and a receiver.
|
||||
//
|
||||
// This type is useful to make sure that a channel is never closed.
|
||||
// By holding always a strong reference to a Sender and a Receiver, the channel will never close.
|
||||
#[derive(Clone)]
|
||||
#[allow(dead_code)]
|
||||
struct Channel(pub Sender<Arc<[u8]>>, pub Receiver<Arc<[u8]>>);
|
||||
|
||||
/// Error type returned by the `publish` method.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
|
|
@ -27,47 +28,22 @@ pub enum PublishError {
|
|||
CapacityOverflow(Arc<str>),
|
||||
}
|
||||
|
||||
/// EventBus builder.
|
||||
///
|
||||
/// It is used to configure the event bus before building it.
|
||||
pub struct EventBusBuilder {
|
||||
/// Topics are bounded to a capacity, if the capacity is overflown, messages can't be published.
|
||||
topic_capacity: usize,
|
||||
}
|
||||
|
||||
impl EventBusBuilder {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
topic_capacity: DEFAULT_TOPIC_CAPACITY,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_topic_capacity(self, topic_capacity: usize) -> Self {
|
||||
Self { topic_capacity }
|
||||
}
|
||||
|
||||
pub fn build(self) -> EventBus {
|
||||
EventBus {
|
||||
inner: Arc::new(DashMap::new()),
|
||||
topic_capacity: self.topic_capacity,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A thread-safe event bus.
|
||||
///
|
||||
/// Users can subscribe to topics and publish to them.
|
||||
///
|
||||
/// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory.
|
||||
#[derive(Clone)]
|
||||
pub struct EventBus {
|
||||
inner: Arc<DashMap<String, Topic>>,
|
||||
inner: RcMap<Arc<str>, Channel>,
|
||||
topic_capacity: usize,
|
||||
}
|
||||
|
||||
impl Default for EventBus {
|
||||
fn default() -> Self {
|
||||
EventBus::builder().build()
|
||||
EventBus {
|
||||
inner: RcMap::new(),
|
||||
topic_capacity: DEFAULT_TOPIC_CAPACITY,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -76,54 +52,42 @@ impl EventBus {
|
|||
Self::default()
|
||||
}
|
||||
|
||||
pub fn builder() -> EventBusBuilder {
|
||||
EventBusBuilder::new()
|
||||
pub fn new_with_topic_capacity(topic_capacity: usize) -> EventBus {
|
||||
let mut bus = EventBus::new();
|
||||
bus.topic_capacity = topic_capacity;
|
||||
bus
|
||||
}
|
||||
|
||||
/// Subscribes to a topic and returns a `Subscription`.
|
||||
///
|
||||
/// This operation will never fail, if the topic doesn't exist it gets internally created.
|
||||
/// This operation will never fail: If the topic doesn't exist it gets internally created.
|
||||
///
|
||||
/// Once the subscription goes out of scope and there are no more references to the given topic,
|
||||
/// the topic will automatically be dropped from memory.
|
||||
pub fn subscribe(&self, topic_name: &str) -> Subscription {
|
||||
if let Some(topic) = self.inner.get(topic_name) {
|
||||
topic
|
||||
.subscribers
|
||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
return Subscription {
|
||||
topic: topic_name.into(),
|
||||
topics_ref: Arc::downgrade(&self.inner),
|
||||
rx: topic.sender.new_receiver(),
|
||||
};
|
||||
pub fn subscribe(&self, topic: &str) -> Subscription {
|
||||
if let Some(object_ref) = self.inner.get(topic.into()) {
|
||||
return Subscription::from(object_ref);
|
||||
}
|
||||
|
||||
let (tx, rx) = broadcast::<Arc<[u8]>>(self.topic_capacity);
|
||||
let (tx, rx) = broadcast(self.topic_capacity);
|
||||
|
||||
let topic = Topic {
|
||||
subscribers: AtomicUsize::new(1),
|
||||
sender: tx,
|
||||
};
|
||||
let object_ref = self.inner.insert(topic.into(), Channel(tx, rx));
|
||||
|
||||
self.inner.insert(topic_name.to_string(), topic);
|
||||
|
||||
Subscription {
|
||||
topic: topic_name.into(),
|
||||
topics_ref: Arc::downgrade(&self.inner),
|
||||
rx,
|
||||
}
|
||||
Subscription::from(object_ref)
|
||||
}
|
||||
|
||||
/// Publishes a bunch of bytes to a topic.
|
||||
///
|
||||
/// If the topic doesn't exist, nothing will happen and it won't be considered an error.
|
||||
/// This method will only fail if, in case a topic with the given name exists, the internal `send` operation fails.
|
||||
pub fn publish(&self, topic_name: &str, data: &[u8]) -> Result<(), PublishError> {
|
||||
let Some(topic) = self.inner.get(topic_name) else {
|
||||
pub fn publish(&self, topic: &str, data: &[u8]) -> Result<(), PublishError> {
|
||||
let Some(object_ref) = self.inner.get(topic.into()) else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let result = topic.sender.try_broadcast(Arc::from(data));
|
||||
let Channel(tx, _) = object_ref.value();
|
||||
|
||||
let result = tx.try_broadcast(Arc::from(data));
|
||||
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
|
|
@ -131,11 +95,11 @@ impl EventBus {
|
|||
Err(async_broadcast::TrySendError::Inactive(_)) => Ok(()),
|
||||
// The channel is closed, we return an error as this is unexpected
|
||||
Err(async_broadcast::TrySendError::Closed(_)) => {
|
||||
Err(PublishError::ChannelClosed(topic_name.into()))
|
||||
Err(PublishError::ChannelClosed(topic.into()))
|
||||
}
|
||||
// The channel is overflown, we return an error
|
||||
Err(async_broadcast::TrySendError::Full(_)) => {
|
||||
Err(PublishError::CapacityOverflow(topic_name.into()))
|
||||
Err(PublishError::CapacityOverflow(topic.into()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
113
src/rc_map.rs
Normal file
113
src/rc_map.rs
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
//! This module provides an RcMap (Or ReferenceCountMap) which is a map that keeps track of how many references of a value in a pair exist.
|
||||
//!
|
||||
//! Every time someone gets a value by key, that value's reference counter increases.
|
||||
//! When a reference to a value is dropped, the reference counter decreases.
|
||||
//!
|
||||
//! When the references counter of a value hits 0, the whole pair is removed from the map.
|
||||
use dashmap::DashMap;
|
||||
use std::{
|
||||
hash::Hash,
|
||||
sync::{
|
||||
Arc, Weak,
|
||||
atomic::{AtomicIsize, Ordering},
|
||||
},
|
||||
};
|
||||
|
||||
/// A smart reference around a key value pair.
|
||||
///
|
||||
/// Once it is dropped, it will decrease the reference counter of the pair and potentially remove the pair if the counter hits 0.
|
||||
#[derive(Clone)]
|
||||
pub struct ObjectRef<K, V>
|
||||
where
|
||||
K: Hash + Eq + Clone,
|
||||
{
|
||||
parent_ref: Weak<DashMap<K, (AtomicIsize, V)>>,
|
||||
key: K,
|
||||
value: V,
|
||||
}
|
||||
|
||||
impl<K, V> ObjectRef<K, V>
|
||||
where
|
||||
K: Hash + Eq + Clone,
|
||||
{
|
||||
pub fn value(&self) -> &V {
|
||||
&self.value
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> Drop for ObjectRef<K, V>
|
||||
where
|
||||
K: Hash + Eq + Clone,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
let Some(map) = self.parent_ref.upgrade() else {
|
||||
return;
|
||||
};
|
||||
|
||||
map.alter(&self.key, |_, (count, value)| {
|
||||
count.fetch_sub(1, Ordering::Relaxed);
|
||||
(count, value)
|
||||
});
|
||||
|
||||
map.remove_if(&self.key, |_, (count, _)| {
|
||||
count.load(Ordering::Relaxed) <= 0
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// A ReferenceCountMap.
|
||||
///
|
||||
/// It exposes get and insert operations.
|
||||
///
|
||||
/// The insert operation always returns an `ObjectRef` to the inserted value.
|
||||
/// If that object is dropped and no other references existed to that pair, the pair is cleaned up.
|
||||
#[derive(Clone)]
|
||||
pub struct RcMap<K, V> {
|
||||
inner: Arc<DashMap<K, (AtomicIsize, V)>>,
|
||||
}
|
||||
|
||||
impl<K, V> RcMap<K, V>
|
||||
where
|
||||
K: Hash + Eq + Clone,
|
||||
V: Clone,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
inner: Arc::new(DashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, key: K) -> Option<ObjectRef<K, V>> {
|
||||
self.inner.alter(&key, |_, (count, value)| {
|
||||
count.fetch_add(1, Ordering::Relaxed);
|
||||
(count, value)
|
||||
});
|
||||
|
||||
let option = self.inner.get(&key);
|
||||
|
||||
match option {
|
||||
Some(value_ref) => {
|
||||
let (_count, value) = value_ref.value();
|
||||
|
||||
Some(ObjectRef {
|
||||
key,
|
||||
parent_ref: Arc::downgrade(&self.inner),
|
||||
value: value.clone(),
|
||||
})
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&self, key: K, value: V) -> ObjectRef<K, V> {
|
||||
let _prev = self
|
||||
.inner
|
||||
.insert(key.clone(), (AtomicIsize::new(1), value.clone()));
|
||||
|
||||
ObjectRef {
|
||||
key,
|
||||
parent_ref: Arc::downgrade(&self.inner),
|
||||
value,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,14 +1,13 @@
|
|||
use crate::Topic;
|
||||
use crate::{Channel, rc_map::ObjectRef};
|
||||
use async_broadcast::Receiver;
|
||||
use dashmap::DashMap;
|
||||
use futures::Stream;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
sync::{Arc, Weak, atomic::Ordering},
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
/// Holds a subscription to a topic.
|
||||
/// Holds a subscription to a channel.
|
||||
///
|
||||
/// `Subscription` implements `Stream`, which means that the user can consume it as a stream
|
||||
/// to listen for incoming messages.
|
||||
|
|
@ -22,9 +21,24 @@ use std::{
|
|||
///
|
||||
/// When the subscription is dropped, the parent topic might be deallocated from memory if no other subscriptions to it exist.
|
||||
pub struct Subscription {
|
||||
pub(crate) topic: Arc<str>,
|
||||
pub(crate) topics_ref: Weak<DashMap<String, Topic>>,
|
||||
pub(crate) rx: Receiver<Arc<[u8]>>,
|
||||
// We need to keep the ownership of the object ref
|
||||
// Otherwise if the object ref gets dropped, it might cleanup the topic
|
||||
// And the channel would get closed
|
||||
_object_ref: ObjectRef<Arc<str>, Channel>,
|
||||
rx: Receiver<Arc<[u8]>>,
|
||||
}
|
||||
|
||||
impl From<ObjectRef<Arc<str>, Channel>> for Subscription {
|
||||
fn from(object_ref: ObjectRef<Arc<str>, Channel>) -> Self {
|
||||
let Channel(tx, _) = object_ref.value();
|
||||
|
||||
let rx = tx.new_receiver();
|
||||
|
||||
Self {
|
||||
_object_ref: object_ref,
|
||||
rx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for Subscription {
|
||||
|
|
@ -38,27 +52,3 @@ impl Stream for Subscription {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Subscription {
|
||||
fn drop(&mut self) {
|
||||
let Some(topics) = self.topics_ref.upgrade() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Scope the guard so it drops before we call `remove`.
|
||||
//
|
||||
// This is done because trying to call `remove` while we hold a read guard
|
||||
// from the DashMap will result in a deadlock.
|
||||
let subscribers_count = {
|
||||
let Some(topic_ref) = topics.get(self.topic.as_ref()) else {
|
||||
return;
|
||||
};
|
||||
|
||||
topic_ref.subscribers.fetch_sub(1, Ordering::Relaxed)
|
||||
};
|
||||
|
||||
if subscribers_count <= 1 {
|
||||
let _ = topics.remove(self.topic.as_ref());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue