1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use crate::{
listener::{Listener, Listeners},
EventTypeFlags,
};
#[allow(unused_imports)]
use tracing::{debug, info, trace, warn};
use twilight_model::gateway::event::{shard::Payload, Event};
pub async fn bytes(listeners: Listeners<Event>, bytes: &[u8]) {
for listener in listeners.all() {
if listener.events.contains(EventTypeFlags::SHARD_PAYLOAD) {
let event = Event::ShardPayload(Payload {
bytes: bytes.to_owned(),
});
let _ = listener.tx.unbounded_send(event);
}
}
}
pub fn event(listeners: &Listeners<Event>, event: Event) {
let listeners = listeners.all();
let mut remove_listeners = Vec::new();
let mut last = None;
for (idx, guard) in listeners.iter().enumerate() {
let id = *guard.key();
let listener = guard.value();
if idx == listeners.len() - 1 {
last = Some(*guard.key());
break;
}
let event_type = EventTypeFlags::from(event.kind());
if !listener.events.contains(event_type) {
trace!(
"[ShardProcessor] Listener {} doesn't want event type {:?}",
id,
event_type,
);
continue;
}
if !_emit_to_listener(id, listener, event.clone()) {
remove_listeners.push(id);
}
}
if let Some(id) = last {
if let Some(listener) = listeners.get(&id) {
if !_emit_to_listener(id, listener.value(), event) {
remove_listeners.push(id);
}
}
}
for id in &remove_listeners {
debug!("[ShardProcessor] Removing listener {}", id);
listeners.remove(id);
}
}
fn _emit_to_listener(id: u64, listener: &Listener<Event>, event: Event) -> bool {
let event_type = EventTypeFlags::from(event.kind());
if !listener.events.contains(event_type) {
trace!(
"[ShardProcessor] Listener {} doesn't want event type {:?}",
id,
event_type,
);
return true;
}
listener.tx.unbounded_send(event).is_ok()
}