feat: add more tests & fix rc map bug

This commit is contained in:
JasterV 2025-11-28 14:32:13 +01:00
parent 84df9ac498
commit a4bd644375
2 changed files with 131 additions and 19 deletions

View file

@ -1,6 +1,9 @@
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))] #![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
use crate::{rc_map::RcMap, subscription::Subscription}; use crate::{
rc_map::{InsertError, RcMap},
subscription::Subscription,
};
use async_broadcast::{Sender, broadcast}; use async_broadcast::{Sender, broadcast};
use std::sync::Arc; use std::sync::Arc;
@ -57,21 +60,20 @@ impl EventBus {
/// Once the subscription goes out of scope and there are no more references to the given topic, /// Once the subscription goes out of scope and there are no more references to the given topic,
/// the topic will automatically be dropped from memory. /// the topic will automatically be dropped from memory.
pub fn subscribe(&self, topic: &str) -> Subscription { pub fn subscribe(&self, topic: &str) -> Subscription {
if let Some(object_ref) = self.inner.get(topic.into()) {
return Subscription::from(object_ref);
}
// When we create a channel, a single sender and receiver are created.
//
// If in the moment of the channel creation, either the sender or the receiver get dropped, the channel will immediately be closed.
//
// This is why we are not using `Subscription::from(object_ref)` in this scenario
// but rather we must make use of the receiver created or the channel will be closed.
let (tx, rx) = broadcast(self.topic_capacity); let (tx, rx) = broadcast(self.topic_capacity);
let object_ref = self.inner.insert(topic.into(), tx); match self.inner.insert(topic.into(), tx) {
Ok(object_ref) => {
Subscription::new_with_rx(object_ref, rx) // If in the moment of the channel creation, either the sender or the receiver get dropped, the channel will immediately be closed.
//
// This is why we are not using `Subscription::from(object_ref)` in this scenario
// but rather we must make use of the receiver created or the channel will be closed.
Subscription::new_with_rx(object_ref, rx)
}
// In this case we are fine with the new channel we created being dropped.
// Since a channel already exists for this key we don't need to store the receiver and we can let the channel be closed.
Err(InsertError::AlreadyExists(_key, object_ref)) => Subscription::from(object_ref),
}
} }
/// Publishes a bunch of bytes to a topic. /// Publishes a bunch of bytes to a topic.

View file

@ -6,6 +6,7 @@
//! When the references counter of a value hits 0, the whole pair is removed from the map. //! When the references counter of a value hits 0, the whole pair is removed from the map.
use dashmap::DashMap; use dashmap::DashMap;
use std::{ use std::{
fmt::Debug,
hash::Hash, hash::Hash,
sync::{ sync::{
Arc, Weak, Arc, Weak,
@ -16,7 +17,7 @@ use std::{
/// A smart reference around a key value pair. /// A smart reference around a key value pair.
/// ///
/// Once it is dropped, it will decrease the reference counter of the pair and potentially remove the pair if the counter hits 0. /// Once it is dropped, it will decrease the reference counter of the pair and potentially remove the pair if the counter hits 0.
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct ObjectRef<K, V> pub struct ObjectRef<K, V>
where where
K: Hash + Eq + Clone, K: Hash + Eq + Clone,
@ -55,6 +56,17 @@ where
} }
} }
#[derive(thiserror::Error, Debug)]
pub enum InsertError<K, V>
where
K: Hash + Eq + Clone + Debug,
{
#[error(
"An entry already exists with the given key: '{0:?}'. You must wait until all existing object references are dropped for the pair to be removed."
)]
AlreadyExists(K, ObjectRef<K, V>),
}
/// A ReferenceCountMap. /// A ReferenceCountMap.
/// ///
/// It exposes get and insert operations. /// It exposes get and insert operations.
@ -68,7 +80,7 @@ pub struct RcMap<K, V> {
impl<K, V> RcMap<K, V> impl<K, V> RcMap<K, V>
where where
K: Hash + Eq + Clone, K: Hash + Eq + Clone + Debug,
V: Clone, V: Clone,
{ {
pub fn new() -> Self { pub fn new() -> Self {
@ -99,15 +111,113 @@ where
} }
} }
pub fn insert(&self, key: K, value: V) -> ObjectRef<K, V> { /// Insert a new pair into the map.
///
/// If an entry already exists for the given key, an error will be returned.
///
/// For consistency reasons an entry must only be removed by the last `ObjectRef` being dropped.
///
/// Otherwise, we could have unrelated old `ObjectRef` instances modifying the reference count of the new entry when being dropped.
///
/// To prevent this from happening, we enforce this rule so that you must wait until all `ObjectRef` pointing to the current entry are dropped.
pub fn insert(&self, key: K, value: V) -> Result<ObjectRef<K, V>, InsertError<K, V>> {
if let Some(object_ref) = self.get(key.clone()) {
return Err(InsertError::AlreadyExists(key, object_ref));
}
let _prev = self let _prev = self
.inner .inner
.insert(key.clone(), (AtomicIsize::new(1), value.clone())); .insert(key.clone(), (AtomicIsize::new(1), value.clone()));
ObjectRef { Ok(ObjectRef {
key, key,
parent_ref: Arc::downgrade(&self.inner), parent_ref: Arc::downgrade(&self.inner),
value, value,
} })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn it_can_insert_pairs_and_get_them() {
let map = RcMap::new();
let inserted_ref = map
.insert("potatoe", "chair")
.expect("No entry should exist");
let obj_ref = map.get("potatoe").expect("A value should be inserted");
assert_eq!(obj_ref.value(), inserted_ref.value())
}
#[test]
fn it_removes_the_pair_when_the_only_existing_obj_ref_drops() {
let map = RcMap::new();
let inserted_ref = map
.insert("potatoe", "chair")
.expect("No entry should exist");
drop(inserted_ref);
let obj_ref = map.get("potatoe");
assert!(obj_ref.is_none());
}
#[test]
fn it_removes_the_pair_only_when_the_all_existing_refs_drop() {
let map = RcMap::new();
let inserted_ref = map
.insert("potatoe", "chair")
.expect("No entry should exist");
let obj_ref1 = map.get("potatoe");
let obj_ref2 = map.get("potatoe");
let obj_ref3 = map.get("potatoe");
drop(inserted_ref);
assert!(map.get("potatoe").is_some());
drop(obj_ref1);
assert!(map.get("potatoe").is_some());
drop(obj_ref2);
assert!(map.get("potatoe").is_some());
drop(obj_ref3);
assert!(map.get("potatoe").is_none());
}
#[test]
fn it_returns_an_error_if_trying_to_insert_a_key_that_already_exists() {
let map = RcMap::new();
let _ref = map
.insert("potatoe", "chair")
.expect("No entry should exist");
let result = map.insert("potatoe", "table");
assert!(matches!(result, Err(InsertError::AlreadyExists(_, _))));
}
#[test]
fn it_can_insert_a_pair_after_the_old_one_has_been_removed() {
let map = RcMap::new();
let inserted_ref = map
.insert("potatoe", "chair")
.expect("No entry should exist");
let result = map.insert("potatoe", "table");
assert!(matches!(result, Err(InsertError::AlreadyExists(_, _))));
// The error also contains an object ref so we must drop it
drop(result);
drop(inserted_ref);
let result = map.insert("potatoe", "table");
assert!(matches!(result, Ok(_)));
} }
} }