1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
use crate::EventTypeFlags;
use dashmap::DashMap;
use futures_channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use std::sync::{
    atomic::{AtomicU64, Ordering},
    Arc,
};

#[derive(Debug)]
pub struct Listener<T> {
    pub events: EventTypeFlags,
    pub tx: UnboundedSender<T>,
}

#[derive(Debug)]
struct ListenersRef<T> {
    id: AtomicU64,
    listeners: DashMap<u64, Listener<T>>,
}

impl<T> Default for ListenersRef<T> {
    fn default() -> Self {
        Self {
            id: AtomicU64::new(0),
            listeners: DashMap::new(),
        }
    }
}

#[derive(Clone, Debug)]
pub struct Listeners<T>(Arc<ListenersRef<T>>);

impl<T> Listeners<T> {
    pub fn add(&self, events: EventTypeFlags) -> UnboundedReceiver<T> {
        let id = self.0.id.fetch_add(1, Ordering::Release) + 1;
        let (tx, rx) = mpsc::unbounded();

        self.0.listeners.insert(id, Listener { events, tx });

        rx
    }

    pub fn all(&self) -> &DashMap<u64, Listener<T>> {
        &self.0.listeners
    }

    pub fn remove_all(&self) {
        self.0.listeners.clear();
    }
}

impl<T> Default for Listeners<T> {
    fn default() -> Self {
        Self(Arc::new(ListenersRef::default()))
    }
}