refactor: make more clear internally the management of the channel

This commit is contained in:
JasterV 2025-12-01 12:12:10 +01:00
parent 21d7c2257a
commit fd0d761e17
3 changed files with 27 additions and 34 deletions

View file

@ -1,10 +1,13 @@
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
mod rc_map;
mod subscription;
use crate::{
rc_map::{InsertError, RcMap},
subscription::Subscription,
};
use async_broadcast::{Sender, broadcast};
use async_broadcast::{InactiveReceiver, Sender, broadcast};
use std::sync::Arc;
/// The default topic capacity, it has been set to this value
@ -12,8 +15,17 @@ use std::sync::Arc;
/// set their preferred value.
pub const DEFAULT_TOPIC_CAPACITY: usize = 1000;
mod rc_map;
mod subscription;
/// Utility typed used to keep at least an instance of both a sender and a receiver
/// in memory so the channel doesn't get closed.
///
/// `async_broadcast` will drop a channel if either all the receivers or all the senders get dropped.
#[derive(Clone)]
struct Channel(
pub Sender<Arc<[u8]>>,
// The inactive receiver is never used but it needs to be here
// otherwise we risk the channel being closed if all other receivers are dropped.
#[allow(dead_code)] InactiveReceiver<Arc<[u8]>>,
);
/// Error type returned by the `publish` method.
#[derive(thiserror::Error, Debug)]
@ -32,7 +44,7 @@ pub enum PublishError {
/// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory.
#[derive(Clone)]
pub struct EventBus {
inner: RcMap<Arc<str>, Sender<Arc<[u8]>>>,
inner: RcMap<Arc<str>, Channel>,
topic_capacity: usize,
}
@ -64,17 +76,10 @@ impl EventBus {
/// the topic will automatically be dropped from memory.
pub fn subscribe(&self, topic: &str) -> Subscription {
let (tx, rx) = broadcast(self.topic_capacity);
let channel = Channel(tx, rx.deactivate());
match self.inner.insert(topic.into(), tx) {
Ok(object_ref) => {
// If in the moment of the channel creation, either the sender or the receiver get dropped, the channel will immediately be closed.
//
// This is why we are not using `Subscription::from(object_ref)` in this scenario
// but rather we must make use of the receiver created or the channel will be closed.
Subscription::new_with_rx(object_ref, rx)
}
// In this case we are fine with the new channel we created being dropped.
// Since a channel already exists for this key we don't need to store the receiver and we can let the channel be closed.
match self.inner.insert(topic.into(), channel) {
Ok(object_ref) => Subscription::from(object_ref),
Err(InsertError::AlreadyExists(_key, object_ref)) => Subscription::from(object_ref),
}
}
@ -88,7 +93,7 @@ impl EventBus {
return Ok(());
};
let tx = object_ref.value();
let Channel(tx, _) = object_ref.value();
let result = tx.try_broadcast(Arc::from(data));
match result {

View file

@ -14,7 +14,7 @@ use std::{
/// A smart reference around a key value pair.
///
/// Once it is dropped, it will decrease the reference counter of the pair and potentially remove the pair if the counter hits 0.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct ObjectRef<K, V>
where
K: Hash + Eq + Clone,

View file

@ -1,5 +1,5 @@
use crate::rc_map::ObjectRef;
use async_broadcast::{Receiver, Sender};
use crate::{Channel, rc_map::ObjectRef};
use async_broadcast::Receiver;
use futures::Stream;
use std::{
pin::Pin,
@ -24,25 +24,13 @@ pub struct Subscription {
// We need to keep the ownership of the object ref
// Otherwise if the object ref gets dropped, it might cleanup the topic
// And the channel would get closed
_object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>,
_object_ref: ObjectRef<Arc<str>, Channel>,
rx: Receiver<Arc<[u8]>>,
}
impl Subscription {
pub(crate) fn new_with_rx(
object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>,
rx: Receiver<Arc<[u8]>>,
) -> Self {
Self {
_object_ref: object_ref,
rx,
}
}
}
impl From<ObjectRef<Arc<str>, Sender<Arc<[u8]>>>> for Subscription {
fn from(object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>) -> Self {
let tx = object_ref.value();
impl From<ObjectRef<Arc<str>, Channel>> for Subscription {
fn from(object_ref: ObjectRef<Arc<str>, Channel>) -> Self {
let Channel(tx, _) = object_ref.value();
let rx = tx.new_receiver();
Self {