From 84df9ac4987710c110814528cf39a9b12a590945 Mon Sep 17 00:00:00 2001 From: JasterV <49537445+JasterV@users.noreply.github.com> Date: Fri, 28 Nov 2025 01:07:30 +0100 Subject: [PATCH] refactor: remove channel type --- src/lib.rs | 27 ++++++++++++--------------- src/subscription.rs | 23 +++++++++++++++++------ 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index be65039..46dfcb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ #![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))] use crate::{rc_map::RcMap, subscription::Subscription}; -use async_broadcast::{Receiver, Sender, broadcast}; +use async_broadcast::{Sender, broadcast}; use std::sync::Arc; const DEFAULT_TOPIC_CAPACITY: usize = 1000; @@ -9,14 +9,6 @@ const DEFAULT_TOPIC_CAPACITY: usize = 1000; mod rc_map; mod subscription; -// Wrapper around a sender and a receiver. -// -// This type is useful to make sure that a channel is never closed. -// By holding always a strong reference to a Sender and a Receiver, the channel will never close. -#[derive(Clone)] -#[allow(dead_code)] -struct Channel(pub Sender>, pub Receiver>); - /// Error type returned by the `publish` method. #[derive(thiserror::Error, Debug)] pub enum PublishError { @@ -34,7 +26,7 @@ pub enum PublishError { /// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory. #[derive(Clone)] pub struct EventBus { - inner: RcMap, Channel>, + inner: RcMap, Sender>>, topic_capacity: usize, } @@ -69,11 +61,17 @@ impl EventBus { return Subscription::from(object_ref); } + // When we create a channel, a single sender and receiver are created. + // + // If in the moment of the channel creation, either the sender or the receiver get dropped, the channel will immediately be closed. + // + // This is why we are not using `Subscription::from(object_ref)` in this scenario + // but rather we must make use of the receiver created or the channel will be closed. let (tx, rx) = broadcast(self.topic_capacity); - let object_ref = self.inner.insert(topic.into(), Channel(tx, rx)); + let object_ref = self.inner.insert(topic.into(), tx); - Subscription::from(object_ref) + Subscription::new_with_rx(object_ref, rx) } /// Publishes a bunch of bytes to a topic. @@ -85,8 +83,7 @@ impl EventBus { return Ok(()); }; - let Channel(tx, _) = object_ref.value(); - + let tx = object_ref.value(); let result = tx.try_broadcast(Arc::from(data)); match result { @@ -110,7 +107,7 @@ mod tests { use super::*; use futures::StreamExt; - // Helper function to safely extract a message from a stream + // Helper function to extract a message from the subscription async fn get_next_message(sub: &mut Subscription) -> String { let payload = sub.next().await.expect("Stream unexpectedly closed"); String::from_utf8_lossy(&payload).to_string() diff --git a/src/subscription.rs b/src/subscription.rs index a8ed6e8..25234b3 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -1,5 +1,5 @@ -use crate::{Channel, rc_map::ObjectRef}; -use async_broadcast::Receiver; +use crate::rc_map::ObjectRef; +use async_broadcast::{Receiver, Sender}; use futures::Stream; use std::{ pin::Pin, @@ -24,14 +24,25 @@ pub struct Subscription { // We need to keep the ownership of the object ref // Otherwise if the object ref gets dropped, it might cleanup the topic // And the channel would get closed - _object_ref: ObjectRef, Channel>, + _object_ref: ObjectRef, Sender>>, rx: Receiver>, } -impl From, Channel>> for Subscription { - fn from(object_ref: ObjectRef, Channel>) -> Self { - let Channel(tx, _) = object_ref.value(); +impl Subscription { + pub(crate) fn new_with_rx( + object_ref: ObjectRef, Sender>>, + rx: Receiver>, + ) -> Self { + Self { + _object_ref: object_ref, + rx, + } + } +} +impl From, Sender>>> for Subscription { + fn from(object_ref: ObjectRef, Sender>>) -> Self { + let tx = object_ref.value(); let rx = tx.new_receiver(); Self {