From fd0d761e1755ef02706e7d453c39a97b41185650 Mon Sep 17 00:00:00 2001 From: JasterV <49537445+JasterV@users.noreply.github.com> Date: Mon, 1 Dec 2025 12:12:10 +0100 Subject: [PATCH] refactor: make more clear internally the management of the channel --- src/lib.rs | 35 ++++++++++++++++++++--------------- src/rc_map.rs | 2 +- src/subscription.rs | 24 ++++++------------------ 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index bbde0bc..1423b36 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,13 @@ #![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))] +mod rc_map; +mod subscription; + use crate::{ rc_map::{InsertError, RcMap}, subscription::Subscription, }; -use async_broadcast::{Sender, broadcast}; +use async_broadcast::{InactiveReceiver, Sender, broadcast}; use std::sync::Arc; /// The default topic capacity, it has been set to this value @@ -12,8 +15,17 @@ use std::sync::Arc; /// set their preferred value. pub const DEFAULT_TOPIC_CAPACITY: usize = 1000; -mod rc_map; -mod subscription; +/// Utility typed used to keep at least an instance of both a sender and a receiver +/// in memory so the channel doesn't get closed. +/// +/// `async_broadcast` will drop a channel if either all the receivers or all the senders get dropped. +#[derive(Clone)] +struct Channel( + pub Sender>, + // The inactive receiver is never used but it needs to be here + // otherwise we risk the channel being closed if all other receivers are dropped. + #[allow(dead_code)] InactiveReceiver>, +); /// Error type returned by the `publish` method. #[derive(thiserror::Error, Debug)] @@ -32,7 +44,7 @@ pub enum PublishError { /// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory. #[derive(Clone)] pub struct EventBus { - inner: RcMap, Sender>>, + inner: RcMap, Channel>, topic_capacity: usize, } @@ -64,17 +76,10 @@ impl EventBus { /// the topic will automatically be dropped from memory. pub fn subscribe(&self, topic: &str) -> Subscription { let (tx, rx) = broadcast(self.topic_capacity); + let channel = Channel(tx, rx.deactivate()); - match self.inner.insert(topic.into(), tx) { - Ok(object_ref) => { - // If in the moment of the channel creation, either the sender or the receiver get dropped, the channel will immediately be closed. - // - // This is why we are not using `Subscription::from(object_ref)` in this scenario - // but rather we must make use of the receiver created or the channel will be closed. - Subscription::new_with_rx(object_ref, rx) - } - // In this case we are fine with the new channel we created being dropped. - // Since a channel already exists for this key we don't need to store the receiver and we can let the channel be closed. + match self.inner.insert(topic.into(), channel) { + Ok(object_ref) => Subscription::from(object_ref), Err(InsertError::AlreadyExists(_key, object_ref)) => Subscription::from(object_ref), } } @@ -88,7 +93,7 @@ impl EventBus { return Ok(()); }; - let tx = object_ref.value(); + let Channel(tx, _) = object_ref.value(); let result = tx.try_broadcast(Arc::from(data)); match result { diff --git a/src/rc_map.rs b/src/rc_map.rs index 361f4b2..37bacd2 100644 --- a/src/rc_map.rs +++ b/src/rc_map.rs @@ -14,7 +14,7 @@ use std::{ /// A smart reference around a key value pair. /// /// Once it is dropped, it will decrease the reference counter of the pair and potentially remove the pair if the counter hits 0. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct ObjectRef where K: Hash + Eq + Clone, diff --git a/src/subscription.rs b/src/subscription.rs index 25234b3..fdc417b 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -1,5 +1,5 @@ -use crate::rc_map::ObjectRef; -use async_broadcast::{Receiver, Sender}; +use crate::{Channel, rc_map::ObjectRef}; +use async_broadcast::Receiver; use futures::Stream; use std::{ pin::Pin, @@ -24,25 +24,13 @@ pub struct Subscription { // We need to keep the ownership of the object ref // Otherwise if the object ref gets dropped, it might cleanup the topic // And the channel would get closed - _object_ref: ObjectRef, Sender>>, + _object_ref: ObjectRef, Channel>, rx: Receiver>, } -impl Subscription { - pub(crate) fn new_with_rx( - object_ref: ObjectRef, Sender>>, - rx: Receiver>, - ) -> Self { - Self { - _object_ref: object_ref, - rx, - } - } -} - -impl From, Sender>>> for Subscription { - fn from(object_ref: ObjectRef, Sender>>) -> Self { - let tx = object_ref.value(); +impl From, Channel>> for Subscription { + fn from(object_ref: ObjectRef, Channel>) -> Self { + let Channel(tx, _) = object_ref.value(); let rx = tx.new_receiver(); Self {