modio/
loader.rs

1use std::marker::PhantomData;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures_util::future::Either;
6use futures_util::{stream, Stream, StreamExt, TryStreamExt};
7use pin_project_lite::pin_project;
8use serde::de::DeserializeOwned;
9
10use crate::filter::Filter;
11use crate::routing::Route;
12use crate::types::List;
13use crate::{Modio, Result};
14
15/// Interface for retrieving search results.
16pub struct Query<T> {
17    modio: Modio,
18    route: Route,
19    filter: Filter,
20    phantom: PhantomData<T>,
21}
22
23impl<T> Query<T> {
24    pub(crate) fn new(modio: Modio, route: Route, filter: Filter) -> Self {
25        Self {
26            modio,
27            route,
28            filter,
29            phantom: PhantomData,
30        }
31    }
32}
33
34impl<T: DeserializeOwned + Send> Query<T> {
35    /// Returns the first search result.
36    pub async fn first(mut self) -> Result<Option<T>> {
37        self.filter = self.filter.limit(1);
38        let list = self.first_page().await;
39        list.map(|l| l.into_iter().next())
40    }
41
42    /// Returns the first search result page.
43    pub async fn first_page(self) -> Result<Vec<T>> {
44        let list = self.paged().await?.map_ok(|p| p.0.data).try_next().await;
45        list.map(Option::unwrap_or_default)
46    }
47
48    /// Returns the complete search result list.
49    pub async fn collect(self) -> Result<Vec<T>> {
50        self.paged().await?.map_ok(|p| p.0.data).try_concat().await
51    }
52
53    /// Provides a stream over all search result items.
54    ///
55    /// Beware that a `Filter::with_limit` will NOT limit the number of items returned
56    /// by the stream, but limits the page size for the underlying API requests.
57    ///
58    /// # Example
59    /// ```no_run
60    /// use futures_util::TryStreamExt;
61    /// use modio::filter::prelude::*;
62    /// use modio::types::id::Id;
63    ///
64    /// # use modio::{Credentials, Modio, Result};
65    /// #
66    /// # async fn run() -> Result<()> {
67    /// #     let modio = Modio::new(Credentials::new("apikey"))?;
68    /// let filter = Fulltext::eq("soldier");
69    /// let mut st = modio.game(Id::new(51)).mods().search(filter).iter().await?;
70    ///
71    /// // Stream of `Mod`
72    /// while let Some(mod_) = st.try_next().await? {
73    ///     println!("{}. {}", mod_.id, mod_.name);
74    /// }
75    ///
76    /// use futures_util::StreamExt;
77    ///
78    /// // Retrieve the first 10 mods. (Default page size is `100`.)
79    /// let filter = Fulltext::eq("tftd") + with_limit(10);
80    /// let st = modio.game(Id::new(51)).mods().search(filter).iter().await?;
81    /// let mut st = st.take(10);
82    ///
83    /// // Stream of `Mod`
84    /// while let Some(mod_) = st.try_next().await? {
85    ///     println!("{}. {}", mod_.id, mod_.name);
86    /// }
87    /// #     Ok(())
88    /// # }
89    /// ```
90    #[allow(clippy::iter_not_returning_iterator)]
91    pub async fn iter(self) -> Result<impl Stream<Item = Result<T>>> {
92        let (st, (total, _)) = stream(self.modio, self.route, self.filter).await?;
93        let st = st
94            .map_ok(|list| stream::iter(list.into_iter().map(Ok)))
95            .try_flatten();
96        Ok(Box::pin(ResultStream::new(total as usize, st)))
97    }
98
99    /// Provides a stream over all search result pages.
100    ///
101    /// # Example
102    /// ```no_run
103    /// use futures_util::TryStreamExt;
104    /// use modio::filter::prelude::*;
105    /// use modio::types::id::Id;
106    ///
107    /// # use modio::{Credentials, Modio, Result};
108    /// #
109    /// # async fn run() -> Result<()> {
110    /// #     let modio = Modio::new(Credentials::new("apikey"))?;
111    /// let filter = Fulltext::eq("tftd").limit(10);
112    /// let mut st = modio
113    ///     .game(Id::new(51))
114    ///     .mods()
115    ///     .search(filter)
116    ///     .paged()
117    ///     .await?;
118    ///
119    /// // Stream of paged results `Page<Mod>` with page size = 10
120    /// while let Some(page) = st.try_next().await? {
121    ///     println!("Page {}/{}", page.current(), page.page_count());
122    ///     for item in page {
123    ///         println!("  {}. {}", item.id, item.name);
124    ///     }
125    /// }
126    /// #     Ok(())
127    /// # }
128    /// ```
129    pub async fn paged(self) -> Result<impl Stream<Item = Result<Page<T>>>> {
130        let (st, (total, limit)) = stream(self.modio, self.route, self.filter).await?;
131        let size_hint = if total == 0 {
132            0
133        } else {
134            (total - 1) / limit + 1
135        };
136        Ok(Box::pin(ResultStream::new(size_hint as usize, st)))
137    }
138}
139
140async fn stream<T>(
141    modio: Modio,
142    route: Route,
143    filter: Filter,
144) -> Result<(impl Stream<Item = Result<Page<T>>>, (u32, u32))>
145where
146    T: DeserializeOwned + Send,
147{
148    struct State {
149        offset: u32,
150        limit: u32,
151        remaining: u32,
152    }
153    let list = modio
154        .request(route)
155        .query(&filter)
156        .send::<List<T>>()
157        .await?;
158
159    let state = State {
160        offset: list.offset,
161        limit: list.limit,
162        remaining: list.total - list.count,
163    };
164    let initial = (modio, route, filter, state);
165    let stats = (list.total, list.limit);
166    if list.total == 0 {
167        return Ok((Either::Left(stream::empty()), stats));
168    }
169
170    let first = stream::once(async { Ok::<_, crate::Error>(Page(list)) });
171
172    let others = stream::try_unfold(initial, |(modio, route, filter, state)| async move {
173        if let State { remaining: 0, .. } = state {
174            return Ok(None);
175        }
176        let filter = filter.offset((state.offset + state.limit) as usize);
177        let remaining = state.remaining;
178
179        let list = modio
180            .request(route)
181            .query(&filter)
182            .send::<List<T>>()
183            .await?;
184
185        let state = (
186            modio,
187            route,
188            filter,
189            State {
190                offset: list.offset,
191                limit: list.limit,
192                remaining: remaining - list.count,
193            },
194        );
195
196        Ok(Some((Page(list), state)))
197    });
198
199    Ok((Either::Right(first.chain(others)), stats))
200}
201
202/// A `Page` returned by the [`Query::paged`] stream for a search result.
203pub struct Page<T>(List<T>);
204
205impl<T> Page<T> {
206    pub fn data(&self) -> &Vec<T> {
207        &self.0.data
208    }
209
210    pub fn into_data(self) -> Vec<T> {
211        self.0.data
212    }
213
214    /// Returns the current page number.
215    pub fn current(&self) -> usize {
216        self.0.offset as usize / self.page_size() + 1
217    }
218
219    /// Returns the number of pages.
220    pub fn page_count(&self) -> usize {
221        (self.total() - 1) / self.page_size() + 1
222    }
223
224    /// Returns the size of a page.
225    pub fn page_size(&self) -> usize {
226        self.0.limit as usize
227    }
228
229    /// Returns the total number of the search result.
230    pub fn total(&self) -> usize {
231        self.0.total as usize
232    }
233}
234
235// Impl IntoIterator & Deref for Page<T> {{{
236impl<T> std::ops::Deref for Page<T> {
237    type Target = Vec<T>;
238
239    fn deref(&self) -> &Self::Target {
240        &self.0.data
241    }
242}
243
244impl<'a, T> std::iter::IntoIterator for &'a Page<T> {
245    type Item = &'a T;
246    type IntoIter = std::slice::Iter<'a, T>;
247
248    fn into_iter(self) -> std::slice::Iter<'a, T> {
249        self.0.data.iter()
250    }
251}
252
253impl<T> std::iter::IntoIterator for Page<T> {
254    type Item = T;
255    type IntoIter = std::vec::IntoIter<T>;
256
257    fn into_iter(self) -> std::vec::IntoIter<T> {
258        self.0.data.into_iter()
259    }
260}
261// }}}
262
263pin_project! {
264    struct ResultStream<St> {
265        total: usize,
266        #[pin]
267        stream: St,
268    }
269}
270
271impl<St: Stream> ResultStream<St> {
272    fn new(total: usize, stream: St) -> ResultStream<St> {
273        Self { total, stream }
274    }
275}
276
277impl<St: Stream> Stream for ResultStream<St> {
278    type Item = St::Item;
279
280    fn size_hint(&self) -> (usize, Option<usize>) {
281        (self.total, None)
282    }
283
284    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
285        self.project().stream.poll_next(cx)
286    }
287}
288
289// vim: fdm=marker