refactor: remove channel type

This commit is contained in:
JasterV 2025-11-28 01:07:30 +01:00
parent 15bfcc479f
commit 84df9ac498
2 changed files with 29 additions and 21 deletions

View file

@ -1,7 +1,7 @@
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))] #![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
use crate::{rc_map::RcMap, subscription::Subscription}; use crate::{rc_map::RcMap, subscription::Subscription};
use async_broadcast::{Receiver, Sender, broadcast}; use async_broadcast::{Sender, broadcast};
use std::sync::Arc; use std::sync::Arc;
const DEFAULT_TOPIC_CAPACITY: usize = 1000; const DEFAULT_TOPIC_CAPACITY: usize = 1000;
@ -9,14 +9,6 @@ const DEFAULT_TOPIC_CAPACITY: usize = 1000;
mod rc_map; mod rc_map;
mod subscription; mod subscription;
// Wrapper around a sender and a receiver.
//
// This type is useful to make sure that a channel is never closed.
// By holding always a strong reference to a Sender and a Receiver, the channel will never close.
#[derive(Clone)]
#[allow(dead_code)]
struct Channel(pub Sender<Arc<[u8]>>, pub Receiver<Arc<[u8]>>);
/// Error type returned by the `publish` method. /// Error type returned by the `publish` method.
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum PublishError { pub enum PublishError {
@ -34,7 +26,7 @@ pub enum PublishError {
/// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory. /// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory.
#[derive(Clone)] #[derive(Clone)]
pub struct EventBus { pub struct EventBus {
inner: RcMap<Arc<str>, Channel>, inner: RcMap<Arc<str>, Sender<Arc<[u8]>>>,
topic_capacity: usize, topic_capacity: usize,
} }
@ -69,11 +61,17 @@ impl EventBus {
return Subscription::from(object_ref); return Subscription::from(object_ref);
} }
// When we create a channel, a single sender and receiver are created.
//
// If in the moment of the channel creation, either the sender or the receiver get dropped, the channel will immediately be closed.
//
// This is why we are not using `Subscription::from(object_ref)` in this scenario
// but rather we must make use of the receiver created or the channel will be closed.
let (tx, rx) = broadcast(self.topic_capacity); let (tx, rx) = broadcast(self.topic_capacity);
let object_ref = self.inner.insert(topic.into(), Channel(tx, rx)); let object_ref = self.inner.insert(topic.into(), tx);
Subscription::from(object_ref) Subscription::new_with_rx(object_ref, rx)
} }
/// Publishes a bunch of bytes to a topic. /// Publishes a bunch of bytes to a topic.
@ -85,8 +83,7 @@ impl EventBus {
return Ok(()); return Ok(());
}; };
let Channel(tx, _) = object_ref.value(); let tx = object_ref.value();
let result = tx.try_broadcast(Arc::from(data)); let result = tx.try_broadcast(Arc::from(data));
match result { match result {
@ -110,7 +107,7 @@ mod tests {
use super::*; use super::*;
use futures::StreamExt; use futures::StreamExt;
// Helper function to safely extract a message from a stream // Helper function to extract a message from the subscription
async fn get_next_message(sub: &mut Subscription) -> String { async fn get_next_message(sub: &mut Subscription) -> String {
let payload = sub.next().await.expect("Stream unexpectedly closed"); let payload = sub.next().await.expect("Stream unexpectedly closed");
String::from_utf8_lossy(&payload).to_string() String::from_utf8_lossy(&payload).to_string()

View file

@ -1,5 +1,5 @@
use crate::{Channel, rc_map::ObjectRef}; use crate::rc_map::ObjectRef;
use async_broadcast::Receiver; use async_broadcast::{Receiver, Sender};
use futures::Stream; use futures::Stream;
use std::{ use std::{
pin::Pin, pin::Pin,
@ -24,14 +24,25 @@ pub struct Subscription {
// We need to keep the ownership of the object ref // We need to keep the ownership of the object ref
// Otherwise if the object ref gets dropped, it might cleanup the topic // Otherwise if the object ref gets dropped, it might cleanup the topic
// And the channel would get closed // And the channel would get closed
_object_ref: ObjectRef<Arc<str>, Channel>, _object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>,
rx: Receiver<Arc<[u8]>>, rx: Receiver<Arc<[u8]>>,
} }
impl From<ObjectRef<Arc<str>, Channel>> for Subscription { impl Subscription {
fn from(object_ref: ObjectRef<Arc<str>, Channel>) -> Self { pub(crate) fn new_with_rx(
let Channel(tx, _) = object_ref.value(); object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>,
rx: Receiver<Arc<[u8]>>,
) -> Self {
Self {
_object_ref: object_ref,
rx,
}
}
}
impl From<ObjectRef<Arc<str>, Sender<Arc<[u8]>>>> for Subscription {
fn from(object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>) -> Self {
let tx = object_ref.value();
let rx = tx.new_receiver(); let rx = tx.new_receiver();
Self { Self {