refactor: build rc map

This commit is contained in:
JasterV 2025-11-27 22:51:37 +01:00
parent 0f4c1c7c2e
commit 89494b4583
4 changed files with 121 additions and 131 deletions

View file

@ -34,9 +34,7 @@ use futures::StreamExt;
#[tokio::main]
async fn main() {
let bus = EventBus::builder()
.with_topic_capacity(50)
.build();
let bus = EventBus::new_with_topic_capacity(50);
// Subscribe to a topic
let mut sub = bus.subscribe("my_topic");

View file

@ -1,20 +1,13 @@
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
use crate::subscription::Subscription;
use crate::{rc_map::RcMap, subscription::Subscription};
use async_broadcast::{Sender, broadcast};
use dashmap::DashMap;
use std::sync::{Arc, atomic::AtomicUsize};
mod subscription;
use std::sync::Arc;
const DEFAULT_TOPIC_CAPACITY: usize = 1000;
/// Represents a single topic.
/// Contains information about how many subscribers it has and the inner broadcast sender & receivers
pub struct Topic {
subscribers: AtomicUsize,
sender: Sender<Arc<[u8]>>,
}
mod rc_map;
mod subscription;
/// Error type returned by the `publish` method.
#[derive(thiserror::Error, Debug)]
@ -27,47 +20,22 @@ pub enum PublishError {
CapacityOverflow(Arc<str>),
}
/// EventBus builder.
///
/// It is used to configure the event bus before building it.
pub struct EventBusBuilder {
/// Topics are bounded to a capacity, if the capacity is overflown, messages can't be published.
topic_capacity: usize,
}
impl EventBusBuilder {
fn new() -> Self {
Self {
topic_capacity: DEFAULT_TOPIC_CAPACITY,
}
}
pub fn with_topic_capacity(self, topic_capacity: usize) -> Self {
Self { topic_capacity }
}
pub fn build(self) -> EventBus {
EventBus {
inner: Arc::new(DashMap::new()),
topic_capacity: self.topic_capacity,
}
}
}
/// A thread-safe event bus.
///
/// Users can subscribe to topics and publish to them.
///
/// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory.
#[derive(Clone)]
pub struct EventBus {
inner: Arc<DashMap<String, Topic>>,
inner: RcMap<Arc<str>, Sender<Arc<[u8]>>>,
topic_capacity: usize,
}
impl Default for EventBus {
fn default() -> Self {
EventBus::builder().build()
EventBus {
inner: RcMap::new(),
topic_capacity: DEFAULT_TOPIC_CAPACITY,
}
}
}
@ -76,54 +44,42 @@ impl EventBus {
Self::default()
}
pub fn builder() -> EventBusBuilder {
EventBusBuilder::new()
pub fn new_with_topic_capacity(topic_capacity: usize) -> EventBus {
let mut bus = EventBus::new();
bus.topic_capacity = topic_capacity;
bus
}
/// Subscribes to a topic and returns a `Subscription`.
///
/// This operation will never fail, if the topic doesn't exist it gets internally created.
/// This operation will never fail: If the topic doesn't exist it gets internally created.
///
/// Once the subscription goes out of scope and there are no more references to the given topic,
/// the topic will automatically be dropped from memory.
pub fn subscribe(&self, topic_name: &str) -> Subscription {
if let Some(topic) = self.inner.get(topic_name) {
topic
.subscribers
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return Subscription {
topic: topic_name.into(),
topics_ref: Arc::downgrade(&self.inner),
rx: topic.sender.new_receiver(),
};
pub fn subscribe(&self, topic: &str) -> Subscription {
if let Some(object_ref) = self.inner.get(topic.into()) {
return Subscription::from(object_ref);
}
let (tx, rx) = broadcast::<Arc<[u8]>>(self.topic_capacity);
let (tx, _) = broadcast(self.topic_capacity);
let topic = Topic {
subscribers: AtomicUsize::new(1),
sender: tx,
};
let object_ref = self.inner.insert(topic.into(), tx);
self.inner.insert(topic_name.to_string(), topic);
Subscription {
topic: topic_name.into(),
topics_ref: Arc::downgrade(&self.inner),
rx,
}
Subscription::from(object_ref)
}
/// Publishes a bunch of bytes to a topic.
///
/// If the topic doesn't exist, nothing will happen and it won't be considered an error.
/// This method will only fail if, in case a topic with the given name exists, the internal `send` operation fails.
pub fn publish(&self, topic_name: &str, data: &[u8]) -> Result<(), PublishError> {
let Some(topic) = self.inner.get(topic_name) else {
pub fn publish(&self, topic: &str, data: &[u8]) -> Result<(), PublishError> {
let Some(object_ref) = self.inner.get(topic.into()) else {
return Ok(());
};
let result = topic.sender.try_broadcast(Arc::from(data));
let tx = object_ref.value();
let result = tx.try_broadcast(Arc::from(data));
match result {
Ok(_) => Ok(()),
@ -131,11 +87,11 @@ impl EventBus {
Err(async_broadcast::TrySendError::Inactive(_)) => Ok(()),
// The channel is closed, we return an error as this is unexpected
Err(async_broadcast::TrySendError::Closed(_)) => {
Err(PublishError::ChannelClosed(topic_name.into()))
Err(PublishError::ChannelClosed(topic.into()))
}
// The channel is overflown, we return an error
Err(async_broadcast::TrySendError::Full(_)) => {
Err(PublishError::CapacityOverflow(topic_name.into()))
Err(PublishError::CapacityOverflow(topic.into()))
}
}
}

View file

@ -1,37 +1,60 @@
/// Work in progress
///
/// The goal is to build a map that maintains a reference counts of its pairs.
///
/// Once a pair doesn't have any more objects referencing it, the pair gets automatically removed.
use dashmap::DashMap;
use std::{
hash::Hash,
sync::{Arc, atomic::AtomicUsize},
sync::{
Arc, Weak,
atomic::{AtomicIsize, Ordering},
},
};
use dashmap::DashMap;
pub struct Object<T> {
count: AtomicUsize,
elem: T,
#[derive(Clone)]
pub struct ObjectRef<K, V>
where
K: Hash + Eq + Clone,
{
parent_ref: Weak<DashMap<K, (AtomicIsize, V)>>,
key: K,
value: V,
}
impl<T> Object<T> {
pub fn new(value: T) -> Self {
Self {
count: AtomicUsize::new(1),
elem: value,
}
impl<K, V> ObjectRef<K, V>
where
K: Hash + Eq + Clone,
{
pub fn value(&self) -> &V {
&self.value
}
}
impl<K, V> Drop for ObjectRef<K, V>
where
K: Hash + Eq + Clone,
{
fn drop(&mut self) {
let Some(map) = self.parent_ref.upgrade() else {
return;
};
map.alter(&self.key, |_, (count, value)| {
count.fetch_sub(1, Ordering::Relaxed);
(count, value)
});
map.remove_if(&self.key, |_, (count, _)| {
count.load(Ordering::Relaxed) <= 0
});
}
}
#[derive(Clone)]
pub struct RcMap<K, V> {
inner: Arc<DashMap<K, Object<V>>>,
inner: Arc<DashMap<K, (AtomicIsize, V)>>,
}
impl<K, V> RcMap<K, V>
where
K: Hash + Eq,
K: Hash + Eq + Clone,
V: Clone,
{
pub fn new() -> Self {
Self {
@ -39,12 +62,37 @@ where
}
}
pub fn get(&self, key: K) -> Option<Object<V>> {
// TODO
todo!()
pub fn get(&self, key: K) -> Option<ObjectRef<K, V>> {
self.inner.alter(&key, |_, (count, value)| {
count.fetch_add(1, Ordering::Relaxed);
(count, value)
});
let option = self.inner.get(&key);
match option {
Some(value_ref) => {
let (_count, value) = value_ref.value();
Some(ObjectRef {
key,
parent_ref: Arc::downgrade(&self.inner),
value: value.clone(),
})
}
None => None,
}
}
pub fn insert(&self, key: K, value: V) -> Option<Object<V>> {
todo!()
pub fn insert(&self, key: K, value: V) -> ObjectRef<K, V> {
let _prev = self
.inner
.insert(key.clone(), (AtomicIsize::new(1), value.clone()));
ObjectRef {
key,
parent_ref: Arc::downgrade(&self.inner),
value,
}
}
}

View file

@ -1,14 +1,13 @@
use crate::Topic;
use async_broadcast::Receiver;
use dashmap::DashMap;
use crate::rc_map::ObjectRef;
use async_broadcast::{Receiver, Sender};
use futures::Stream;
use std::{
pin::Pin,
sync::{Arc, Weak, atomic::Ordering},
sync::Arc,
task::{Context, Poll},
};
/// Holds a subscription to a topic.
/// Holds a subscription to a channel.
///
/// `Subscription` implements `Stream`, which means that the user can consume it as a stream
/// to listen for incoming messages.
@ -22,9 +21,22 @@ use std::{
///
/// When the subscription is dropped, the parent topic might be deallocated from memory if no other subscriptions to it exist.
pub struct Subscription {
pub(crate) topic: Arc<str>,
pub(crate) topics_ref: Weak<DashMap<String, Topic>>,
pub(crate) rx: Receiver<Arc<[u8]>>,
// We need to keep the ownership of the object ref
// Otherwise if the object ref gets dropped, it might cleanup the topic
// And the channel would get closed
_object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>,
rx: Receiver<Arc<[u8]>>,
}
impl From<ObjectRef<Arc<str>, Sender<Arc<[u8]>>>> for Subscription {
fn from(object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>) -> Self {
let rx = object_ref.value().new_receiver();
Self {
_object_ref: object_ref,
rx,
}
}
}
impl Stream for Subscription {
@ -38,27 +50,3 @@ impl Stream for Subscription {
}
}
}
impl Drop for Subscription {
fn drop(&mut self) {
let Some(topics) = self.topics_ref.upgrade() else {
return;
};
// Scope the guard so it drops before we call `remove`.
//
// This is done because trying to call `remove` while we hold a read guard
// from the DashMap will result in a deadlock.
let subscribers_count = {
let Some(topic_ref) = topics.get(self.topic.as_ref()) else {
return;
};
topic_ref.subscribers.fetch_sub(1, Ordering::Relaxed)
};
if subscribers_count <= 1 {
let _ = topics.remove(self.topic.as_ref());
}
}
}