1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use super::{DayLimiter, Queue};
use async_trait::async_trait;
use futures_channel::{
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
oneshot::{self, Sender},
};
use futures_util::{sink::SinkExt, stream::StreamExt};
use std::{fmt::Debug, time::Duration};
use tokio::time::delay_for;
use tracing::{info, warn};
#[derive(Debug)]
pub struct LargeBotQueue {
buckets: Vec<UnboundedSender<Sender<()>>>,
limiter: DayLimiter,
}
impl LargeBotQueue {
pub async fn new(buckets: usize, http: &twilight_http::Client) -> Self {
let mut queues = Vec::with_capacity(buckets);
for _ in 0..buckets {
let (tx, rx) = unbounded();
tokio::spawn(async {
waiter(rx).await;
});
queues.push(tx)
}
let limiter = DayLimiter::new(http).await.expect(
"Getting the first session limits failed, \
Is network connection available?",
);
if tracing::level_enabled!(tracing::Level::INFO) {
let lock = limiter.0.lock().await;
tracing::info!(
"{}/{} identifies used before next reset in {:.2?}",
lock.current,
lock.total,
lock.next_reset
);
}
Self {
buckets: queues,
limiter,
}
}
}
async fn waiter(mut rx: UnboundedReceiver<Sender<()>>) {
const DUR: Duration = Duration::from_secs(6);
while let Some(req) = rx.next().await {
if let Err(err) = req.send(()) {
warn!(
"[LargeBotQueue/waiter] send failed with: {:?}, skipping",
err
);
}
delay_for(DUR).await;
}
}
#[async_trait]
impl Queue for LargeBotQueue {
async fn request(&self, shard_id: [u64; 2]) {
#[allow(clippy::cast_possible_truncation)]
let bucket = (shard_id[0] % (self.buckets.len() as u64)) as usize;
let (tx, rx) = oneshot::channel();
self.limiter.get().await;
if let Err(err) = self.buckets[bucket].clone().send(tx).await {
warn!("[LargeBotQueue] send failed with: {:?}, skipping", err);
return;
}
info!("Waiting for allowance on shard: {}!", shard_id[0]);
let _ = rx.await;
}
}