1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use crate::{
    listener::{Listener, Listeners},
    EventTypeFlags,
};
#[allow(unused_imports)]
use tracing::{debug, info, trace, warn};
use twilight_model::gateway::event::{shard::Payload, Event};

pub async fn bytes(listeners: Listeners<Event>, bytes: &[u8]) {
    for listener in listeners.all() {
        if listener.events.contains(EventTypeFlags::SHARD_PAYLOAD) {
            let event = Event::ShardPayload(Payload {
                bytes: bytes.to_owned(),
            });

            // If the channel isn't active, this'll be caught by event emissions
            // later.
            let _ = listener.tx.unbounded_send(event);
        }
    }
}

pub fn event(listeners: &Listeners<Event>, event: Event) {
    let listeners = listeners.all();
    let mut remove_listeners = Vec::new();

    // Take up to the last one so that we can later get the last and *move*
    // the event into the listener's channel, rather than clone it like we
    // do here.
    //
    // This avoids a clone, and for users with only 1 listener this will
    // entirely avoid cloning.
    let mut last = None;

    for (idx, guard) in listeners.iter().enumerate() {
        let id = *guard.key();
        let listener = guard.value();
        if idx == listeners.len() - 1 {
            last = Some(*guard.key());

            break;
        }

        let event_type = EventTypeFlags::from(event.kind());

        if !listener.events.contains(event_type) {
            trace!(
                "[ShardProcessor] Listener {} doesn't want event type {:?}",
                id,
                event_type,
            );

            continue;
        }

        if !_emit_to_listener(id, listener, event.clone()) {
            remove_listeners.push(id);
        }
    }

    if let Some(id) = last {
        if let Some(listener) = listeners.get(&id) {
            if !_emit_to_listener(id, listener.value(), event) {
                remove_listeners.push(id);
            }
        }
    }

    for id in &remove_listeners {
        debug!("[ShardProcessor] Removing listener {}", id);

        listeners.remove(id);
    }
}

/// Returns whether the channel is still active.
///
/// If the receiver dropped, return `false` so we know to remove it.
/// These are unbounded channels, so we know it's not because it's full.
fn _emit_to_listener(id: u64, listener: &Listener<Event>, event: Event) -> bool {
    let event_type = EventTypeFlags::from(event.kind());

    if !listener.events.contains(event_type) {
        trace!(
            "[ShardProcessor] Listener {} doesn't want event type {:?}",
            id,
            event_type,
        );

        return true;
    }

    listener.tx.unbounded_send(event).is_ok()
}