1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
mod day_limiter;
mod large_bot_queue;
pub use large_bot_queue::LargeBotQueue;
use async_trait::async_trait;
use day_limiter::DayLimiter;
use futures_channel::{
mpsc::{unbounded, UnboundedReceiver, UnboundedSender},
oneshot::{self, Sender},
};
use futures_util::{sink::SinkExt, stream::StreamExt};
use std::{fmt::Debug, time::Duration};
use tokio::time::delay_for;
#[allow(unused_imports)]
use tracing::{info, warn};
#[async_trait]
pub trait Queue: Debug + Send + Sync {
async fn request(&self, shard_id: [u64; 2]);
}
#[derive(Clone, Debug)]
pub struct LocalQueue(UnboundedSender<Sender<()>>);
impl Default for LocalQueue {
fn default() -> Self {
Self::new()
}
}
impl LocalQueue {
pub fn new() -> Self {
let (tx, rx) = unbounded();
tokio::spawn(async {
waiter(rx).await;
});
Self(tx)
}
}
async fn waiter(mut rx: UnboundedReceiver<Sender<()>>) {
const DUR: Duration = Duration::from_secs(6);
while let Some(req) = rx.next().await {
if let Err(err) = req.send(()) {
warn!("[LocalQueue/waiter] send failed with: {:?}, skipping", err);
}
delay_for(DUR).await;
}
}
#[async_trait]
impl Queue for LocalQueue {
async fn request(&self, _: [u64; 2]) {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.0.clone().send(tx).await {
warn!("[LocalQueue] send failed with: {:?}, skipping", err);
return;
}
info!("Waiting for allowance!");
let _ = rx.await;
}
}