From a4bd64437575a957fa5f9e6afd4719d8683c14c5 Mon Sep 17 00:00:00 2001 From: JasterV <49537445+JasterV@users.noreply.github.com> Date: Fri, 28 Nov 2025 14:32:13 +0100 Subject: [PATCH] feat: add more tests & fix rc map bug --- src/lib.rs | 30 +++++++------ src/rc_map.rs | 120 +++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 131 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 46dfcb3..a561631 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,9 @@ #![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))] -use crate::{rc_map::RcMap, subscription::Subscription}; +use crate::{ + rc_map::{InsertError, RcMap}, + subscription::Subscription, +}; use async_broadcast::{Sender, broadcast}; use std::sync::Arc; @@ -57,21 +60,20 @@ impl EventBus { /// Once the subscription goes out of scope and there are no more references to the given topic, /// the topic will automatically be dropped from memory. pub fn subscribe(&self, topic: &str) -> Subscription { - if let Some(object_ref) = self.inner.get(topic.into()) { - return Subscription::from(object_ref); - } - - // When we create a channel, a single sender and receiver are created. - // - // If in the moment of the channel creation, either the sender or the receiver get dropped, the channel will immediately be closed. - // - // This is why we are not using `Subscription::from(object_ref)` in this scenario - // but rather we must make use of the receiver created or the channel will be closed. let (tx, rx) = broadcast(self.topic_capacity); - let object_ref = self.inner.insert(topic.into(), tx); - - Subscription::new_with_rx(object_ref, rx) + match self.inner.insert(topic.into(), tx) { + Ok(object_ref) => { + // If in the moment of the channel creation, either the sender or the receiver get dropped, the channel will immediately be closed. + // + // This is why we are not using `Subscription::from(object_ref)` in this scenario + // but rather we must make use of the receiver created or the channel will be closed. + Subscription::new_with_rx(object_ref, rx) + } + // In this case we are fine with the new channel we created being dropped. + // Since a channel already exists for this key we don't need to store the receiver and we can let the channel be closed. + Err(InsertError::AlreadyExists(_key, object_ref)) => Subscription::from(object_ref), + } } /// Publishes a bunch of bytes to a topic. diff --git a/src/rc_map.rs b/src/rc_map.rs index 8713ab1..b664608 100644 --- a/src/rc_map.rs +++ b/src/rc_map.rs @@ -6,6 +6,7 @@ //! When the references counter of a value hits 0, the whole pair is removed from the map. use dashmap::DashMap; use std::{ + fmt::Debug, hash::Hash, sync::{ Arc, Weak, @@ -16,7 +17,7 @@ use std::{ /// A smart reference around a key value pair. /// /// Once it is dropped, it will decrease the reference counter of the pair and potentially remove the pair if the counter hits 0. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ObjectRef where K: Hash + Eq + Clone, @@ -55,6 +56,17 @@ where } } +#[derive(thiserror::Error, Debug)] +pub enum InsertError +where + K: Hash + Eq + Clone + Debug, +{ + #[error( + "An entry already exists with the given key: '{0:?}'. You must wait until all existing object references are dropped for the pair to be removed." + )] + AlreadyExists(K, ObjectRef), +} + /// A ReferenceCountMap. /// /// It exposes get and insert operations. @@ -68,7 +80,7 @@ pub struct RcMap { impl RcMap where - K: Hash + Eq + Clone, + K: Hash + Eq + Clone + Debug, V: Clone, { pub fn new() -> Self { @@ -99,15 +111,113 @@ where } } - pub fn insert(&self, key: K, value: V) -> ObjectRef { + /// Insert a new pair into the map. + /// + /// If an entry already exists for the given key, an error will be returned. + /// + /// For consistency reasons an entry must only be removed by the last `ObjectRef` being dropped. + /// + /// Otherwise, we could have unrelated old `ObjectRef` instances modifying the reference count of the new entry when being dropped. + /// + /// To prevent this from happening, we enforce this rule so that you must wait until all `ObjectRef` pointing to the current entry are dropped. + pub fn insert(&self, key: K, value: V) -> Result, InsertError> { + if let Some(object_ref) = self.get(key.clone()) { + return Err(InsertError::AlreadyExists(key, object_ref)); + } + let _prev = self .inner .insert(key.clone(), (AtomicIsize::new(1), value.clone())); - ObjectRef { + Ok(ObjectRef { key, parent_ref: Arc::downgrade(&self.inner), value, - } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_can_insert_pairs_and_get_them() { + let map = RcMap::new(); + let inserted_ref = map + .insert("potatoe", "chair") + .expect("No entry should exist"); + let obj_ref = map.get("potatoe").expect("A value should be inserted"); + + assert_eq!(obj_ref.value(), inserted_ref.value()) + } + + #[test] + fn it_removes_the_pair_when_the_only_existing_obj_ref_drops() { + let map = RcMap::new(); + let inserted_ref = map + .insert("potatoe", "chair") + .expect("No entry should exist"); + + drop(inserted_ref); + + let obj_ref = map.get("potatoe"); + + assert!(obj_ref.is_none()); + } + + #[test] + fn it_removes_the_pair_only_when_the_all_existing_refs_drop() { + let map = RcMap::new(); + + let inserted_ref = map + .insert("potatoe", "chair") + .expect("No entry should exist"); + let obj_ref1 = map.get("potatoe"); + let obj_ref2 = map.get("potatoe"); + let obj_ref3 = map.get("potatoe"); + + drop(inserted_ref); + assert!(map.get("potatoe").is_some()); + drop(obj_ref1); + assert!(map.get("potatoe").is_some()); + drop(obj_ref2); + assert!(map.get("potatoe").is_some()); + drop(obj_ref3); + assert!(map.get("potatoe").is_none()); + } + + #[test] + fn it_returns_an_error_if_trying_to_insert_a_key_that_already_exists() { + let map = RcMap::new(); + + let _ref = map + .insert("potatoe", "chair") + .expect("No entry should exist"); + + let result = map.insert("potatoe", "table"); + + assert!(matches!(result, Err(InsertError::AlreadyExists(_, _)))); + } + + #[test] + fn it_can_insert_a_pair_after_the_old_one_has_been_removed() { + let map = RcMap::new(); + + let inserted_ref = map + .insert("potatoe", "chair") + .expect("No entry should exist"); + + let result = map.insert("potatoe", "table"); + + assert!(matches!(result, Err(InsertError::AlreadyExists(_, _)))); + + // The error also contains an object ref so we must drop it + drop(result); + drop(inserted_ref); + + let result = map.insert("potatoe", "table"); + + assert!(matches!(result, Ok(_))); } }