mirror of
https://codeberg.org/JasterV/event_bus.rs.git
synced 2026-04-26 18:10:02 +00:00
refactor: use rc map in event bus
This commit is contained in:
parent
89494b4583
commit
457816d90d
2 changed files with 21 additions and 11 deletions
18
src/lib.rs
18
src/lib.rs
|
|
@ -1,7 +1,7 @@
|
|||
#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
|
||||
|
||||
use crate::{rc_map::RcMap, subscription::Subscription};
|
||||
use async_broadcast::{Sender, broadcast};
|
||||
use async_broadcast::{Receiver, Sender, broadcast};
|
||||
use std::sync::Arc;
|
||||
|
||||
const DEFAULT_TOPIC_CAPACITY: usize = 1000;
|
||||
|
|
@ -9,6 +9,14 @@ const DEFAULT_TOPIC_CAPACITY: usize = 1000;
|
|||
mod rc_map;
|
||||
mod subscription;
|
||||
|
||||
// Wrapper around a sender and a receiver.
|
||||
//
|
||||
// This type is useful to make sure that a channel is never closed.
|
||||
// By holding always a strong reference to a Sender and a Receiver, the channel will never close.
|
||||
#[derive(Clone)]
|
||||
#[allow(dead_code)]
|
||||
struct Channel(pub Sender<Arc<[u8]>>, pub Receiver<Arc<[u8]>>);
|
||||
|
||||
/// Error type returned by the `publish` method.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum PublishError {
|
||||
|
|
@ -26,7 +34,7 @@ pub enum PublishError {
|
|||
/// When all the subscriptions to a topic get dropped, the topic itself is dropped from memory.
|
||||
#[derive(Clone)]
|
||||
pub struct EventBus {
|
||||
inner: RcMap<Arc<str>, Sender<Arc<[u8]>>>,
|
||||
inner: RcMap<Arc<str>, Channel>,
|
||||
topic_capacity: usize,
|
||||
}
|
||||
|
||||
|
|
@ -61,9 +69,9 @@ impl EventBus {
|
|||
return Subscription::from(object_ref);
|
||||
}
|
||||
|
||||
let (tx, _) = broadcast(self.topic_capacity);
|
||||
let (tx, rx) = broadcast(self.topic_capacity);
|
||||
|
||||
let object_ref = self.inner.insert(topic.into(), tx);
|
||||
let object_ref = self.inner.insert(topic.into(), Channel(tx, rx));
|
||||
|
||||
Subscription::from(object_ref)
|
||||
}
|
||||
|
|
@ -77,7 +85,7 @@ impl EventBus {
|
|||
return Ok(());
|
||||
};
|
||||
|
||||
let tx = object_ref.value();
|
||||
let Channel(tx, _) = object_ref.value();
|
||||
|
||||
let result = tx.try_broadcast(Arc::from(data));
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use crate::rc_map::ObjectRef;
|
||||
use async_broadcast::{Receiver, Sender};
|
||||
use crate::{Channel, rc_map::ObjectRef};
|
||||
use async_broadcast::Receiver;
|
||||
use futures::Stream;
|
||||
use std::{
|
||||
pin::Pin,
|
||||
|
|
@ -24,13 +24,15 @@ pub struct Subscription {
|
|||
// We need to keep the ownership of the object ref
|
||||
// Otherwise if the object ref gets dropped, it might cleanup the topic
|
||||
// And the channel would get closed
|
||||
_object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>,
|
||||
_object_ref: ObjectRef<Arc<str>, Channel>,
|
||||
rx: Receiver<Arc<[u8]>>,
|
||||
}
|
||||
|
||||
impl From<ObjectRef<Arc<str>, Sender<Arc<[u8]>>>> for Subscription {
|
||||
fn from(object_ref: ObjectRef<Arc<str>, Sender<Arc<[u8]>>>) -> Self {
|
||||
let rx = object_ref.value().new_receiver();
|
||||
impl From<ObjectRef<Arc<str>, Channel>> for Subscription {
|
||||
fn from(object_ref: ObjectRef<Arc<str>, Channel>) -> Self {
|
||||
let Channel(tx, _) = object_ref.value();
|
||||
|
||||
let rx = tx.new_receiver();
|
||||
|
||||
Self {
|
||||
_object_ref: object_ref,
|
||||
|
|
|
|||
Loading…
Reference in a new issue