1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::future::Either;
6use futures_util::{stream, Stream, StreamExt, TryStreamExt};
7use pin_project_lite::pin_project;
8use serde::de::DeserializeOwned;
9
10use crate::filter::Filter;
11use crate::routing::Route;
12use crate::types::List;
13use crate::{Modio, Result};
14
15pub struct Query<T> {
17 modio: Modio,
18 route: Route,
19 filter: Filter,
20 phantom: PhantomData<T>,
21}
22
23impl<T> Query<T> {
24 pub(crate) fn new(modio: Modio, route: Route, filter: Filter) -> Self {
25 Self {
26 modio,
27 route,
28 filter,
29 phantom: PhantomData,
30 }
31 }
32}
33
34impl<T: DeserializeOwned + Send> Query<T> {
35 pub async fn first(mut self) -> Result<Option<T>> {
37 self.filter = self.filter.limit(1);
38 let list = self.first_page().await;
39 list.map(|l| l.into_iter().next())
40 }
41
42 pub async fn first_page(self) -> Result<Vec<T>> {
44 let list = self.paged().await?.map_ok(|p| p.0.data).try_next().await;
45 list.map(Option::unwrap_or_default)
46 }
47
48 pub async fn collect(self) -> Result<Vec<T>> {
50 self.paged().await?.map_ok(|p| p.0.data).try_concat().await
51 }
52
53 #[allow(clippy::iter_not_returning_iterator)]
91 pub async fn iter(self) -> Result<impl Stream<Item = Result<T>>> {
92 let (st, (total, _)) = stream(self.modio, self.route, self.filter).await?;
93 let st = st
94 .map_ok(|list| stream::iter(list.into_iter().map(Ok)))
95 .try_flatten();
96 Ok(Box::pin(ResultStream::new(total as usize, st)))
97 }
98
99 pub async fn paged(self) -> Result<impl Stream<Item = Result<Page<T>>>> {
130 let (st, (total, limit)) = stream(self.modio, self.route, self.filter).await?;
131 let size_hint = if total == 0 {
132 0
133 } else {
134 (total - 1) / limit + 1
135 };
136 Ok(Box::pin(ResultStream::new(size_hint as usize, st)))
137 }
138}
139
140async fn stream<T>(
141 modio: Modio,
142 route: Route,
143 filter: Filter,
144) -> Result<(impl Stream<Item = Result<Page<T>>>, (u32, u32))>
145where
146 T: DeserializeOwned + Send,
147{
148 struct State {
149 offset: u32,
150 limit: u32,
151 remaining: u32,
152 }
153 let list = modio
154 .request(route)
155 .query(&filter)
156 .send::<List<T>>()
157 .await?;
158
159 let state = State {
160 offset: list.offset,
161 limit: list.limit,
162 remaining: list.total - list.count,
163 };
164 let initial = (modio, route, filter, state);
165 let stats = (list.total, list.limit);
166 if list.total == 0 {
167 return Ok((Either::Left(stream::empty()), stats));
168 }
169
170 let first = stream::once(async { Ok::<_, crate::Error>(Page(list)) });
171
172 let others = stream::try_unfold(initial, |(modio, route, filter, state)| async move {
173 if let State { remaining: 0, .. } = state {
174 return Ok(None);
175 }
176 let filter = filter.offset((state.offset + state.limit) as usize);
177 let remaining = state.remaining;
178
179 let list = modio
180 .request(route)
181 .query(&filter)
182 .send::<List<T>>()
183 .await?;
184
185 let state = (
186 modio,
187 route,
188 filter,
189 State {
190 offset: list.offset,
191 limit: list.limit,
192 remaining: remaining - list.count,
193 },
194 );
195
196 Ok(Some((Page(list), state)))
197 });
198
199 Ok((Either::Right(first.chain(others)), stats))
200}
201
202pub struct Page<T>(List<T>);
204
205impl<T> Page<T> {
206 pub fn data(&self) -> &Vec<T> {
207 &self.0.data
208 }
209
210 pub fn into_data(self) -> Vec<T> {
211 self.0.data
212 }
213
214 pub fn current(&self) -> usize {
216 self.0.offset as usize / self.page_size() + 1
217 }
218
219 pub fn page_count(&self) -> usize {
221 (self.total() - 1) / self.page_size() + 1
222 }
223
224 pub fn page_size(&self) -> usize {
226 self.0.limit as usize
227 }
228
229 pub fn total(&self) -> usize {
231 self.0.total as usize
232 }
233}
234
235impl<T> std::ops::Deref for Page<T> {
237 type Target = Vec<T>;
238
239 fn deref(&self) -> &Self::Target {
240 &self.0.data
241 }
242}
243
244impl<'a, T> std::iter::IntoIterator for &'a Page<T> {
245 type Item = &'a T;
246 type IntoIter = std::slice::Iter<'a, T>;
247
248 fn into_iter(self) -> std::slice::Iter<'a, T> {
249 self.0.data.iter()
250 }
251}
252
253impl<T> std::iter::IntoIterator for Page<T> {
254 type Item = T;
255 type IntoIter = std::vec::IntoIter<T>;
256
257 fn into_iter(self) -> std::vec::IntoIter<T> {
258 self.0.data.into_iter()
259 }
260}
261pin_project! {
264 struct ResultStream<St> {
265 total: usize,
266 #[pin]
267 stream: St,
268 }
269}
270
271impl<St: Stream> ResultStream<St> {
272 fn new(total: usize, stream: St) -> ResultStream<St> {
273 Self { total, stream }
274 }
275}
276
277impl<St: Stream> Stream for ResultStream<St> {
278 type Item = St::Item;
279
280 fn size_hint(&self) -> (usize, Option<usize>) {
281 (self.total, None)
282 }
283
284 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
285 self.project().stream.poll_next(cx)
286 }
287}
288
289