use std::time::Duration; use anyhow::Result; use itertools::Itertools; use llm_readability::extractor; use reqwest::{Client, Url}; use rss::Channel; use tokio::{task::JoinSet, time::sleep}; use warp::Filter; const REQUEST_DELAY: Duration = Duration::from_secs(1); async fn get_feed(url: String, client: &Client) -> Result { let url = urlencoding::decode(&url)?.into_owned(); let content = client.get(url).send().await?.bytes().await?; let channel = Channel::read_from(&content[..])?; Ok(channel) } fn get_domain(item: &rss::Item) -> Option { item.link() .and_then(|link| Url::parse(link).ok()) .and_then(|parsed| parsed.domain().map(|domain| domain.to_string())) } async fn complete(mut channel: Channel, client: &Client) -> Result> { let grouped: Vec> = channel .items() .iter() .chunk_by(|item| get_domain(item)) .into_iter() .map(|(_k, v)| v.cloned().collect()) .collect(); let mut set = JoinSet::new(); for mut items in grouped.into_iter() { let client = client.clone(); set.spawn(async move { for (index, item) in &mut items.iter_mut().enumerate() { if index > 0 { sleep(REQUEST_DELAY).await; } if let Some(ref link) = item.link && let Ok(content) = get_content(link, &client.clone()).await { item.set_description(content); } } items }); } let items: Vec = set.join_all().await.concat(); channel.set_items(items); Ok(Box::new(channel)) } async fn get_content(link: &str, client: &Client) -> Result { let response = client.get(link).send().await?; let content = extractor::extract(&mut response.bytes().await?.as_ref(), &Url::parse(link)?)?.content; Ok(content) } #[derive(Debug)] #[allow(dead_code)] struct CustomReject(anyhow::Error); impl warp::reject::Reject for CustomReject {} pub(crate) fn custom_reject(error: impl Into) -> warp::Rejection { warp::reject::custom(CustomReject(error.into())) } #[tokio::main] async fn main() { let client = Client::new(); let path = warp::path!(String) .and_then(move |url| { let client = client.clone(); async move { let feed = get_feed(url, &client).await.map_err(custom_reject)?; let updated = complete(feed, &client).await.map_err(custom_reject)?; Ok::(format!("{}", updated)) } }) .map(|reply| warp::reply::with_header(reply, "Content-Type", "application/rss+xml")); warp::serve(path).run(([0, 0, 0, 0], 3030)).await; }