run downloads in parallel

This commit is contained in:
Sebastian Hugentobler 2024-07-03 13:43:08 +02:00
parent cf8fbe0965
commit f7c1408cef
Signed by: shu
GPG Key ID: BB32CF3CA052C2F0
3 changed files with 88 additions and 35 deletions

View File

@ -74,6 +74,12 @@ Login and use the resulting cookie to download all issues from 2024-06-01 until
nzz-cookie -u 'myuser@example.com' <pw | nzz-download -f 2024-06-01 -t 2024-06-05 nzz-cookie -u 'myuser@example.com' <pw | nzz-download -f 2024-06-01 -t 2024-06-05
``` ```
## Caveats
There are no retries on a failed download so far, it just crashes. Stemming from
that I would advise not to try and download big ranges at once until that is
fixed.
## License ## License
Licensed as [AGPL-3.0](https://www.gnu.org/licenses/agpl-3.0.html). Licensed as [AGPL-3.0](https://www.gnu.org/licenses/agpl-3.0.html).

View File

@ -1,16 +1,54 @@
//! Handle downloads of newspaper issues. //! Handle downloads of newspaper issues.
use std::{ use std::{
fs::{self}, fs,
io::{Cursor, Read}, io::{Cursor, Read},
path::Path, path::{Path, PathBuf},
sync::Arc,
}; };
use anyhow::Result; use anyhow::Result;
use tokio::{spawn, sync::Semaphore, task::JoinSet};
use tracing::{debug, info}; use tracing::{debug, info};
use crate::{nzz::Issue, pdf}; use crate::{nzz::Issue, pdf};
const MAX_DOWNLOADS: usize = 4;
/// Fetch a single newspaper issue and save the merged pdf to `output_dir`.
async fn fetch_issue(issue: &Issue, output_dir: PathBuf) -> Result<()> {
info!("saving issue {}", issue.publication_date);
let client = reqwest::Client::new();
let tmp_dir = tempfile::tempdir()?;
let mut pages = Vec::new();
for (i, page) in issue.pages.iter().enumerate() {
debug!(
"fetching issue {}, page {}: {page}",
issue.publication_date,
i + 1
);
let response = client.get(page).send().await?;
let mut content = Cursor::new(response.bytes().await?);
let mut page_data = Vec::new();
content.read_to_end(&mut page_data)?;
let tmp_page = tmp_dir.path().join(i.to_string());
fs::write(&tmp_page, page_data)?;
pages.push(tmp_page);
}
let issue_name = format!("nzz_{}.pdf", issue.publication_date);
let issue_path = output_dir.join(issue_name);
let issue_title = format!("NZZ {}", issue.publication_date);
pdf::merge(pages, &issue_path, &issue_title)?;
debug!("issue {} saved", issue.publication_date);
Ok(())
}
/// Download all pages of the provided `issues` and save them merged to the directory `output_dir`. /// Download all pages of the provided `issues` and save them merged to the directory `output_dir`.
/// ///
/// Create `output_dir` if it does not exist. /// Create `output_dir` if it does not exist.
@ -18,34 +56,23 @@ pub async fn fetch(issues: Vec<Issue>, output_dir: &Path) -> Result<()> {
debug!("ensuring {output_dir:?} exists"); debug!("ensuring {output_dir:?} exists");
fs::create_dir_all(output_dir)?; fs::create_dir_all(output_dir)?;
let permits = Arc::new(Semaphore::new(MAX_DOWNLOADS));
let mut jobs = JoinSet::new();
for issue in issues { for issue in issues {
info!("saving issue {}", issue.publication_date); let permits = permits.clone();
let output_dir = output_dir.to_path_buf().clone();
let tmp_dir = tempfile::tempdir()?; let job: tokio::task::JoinHandle<Result<()>> = spawn(async move {
let mut pages = Vec::new(); let _permit = permits.acquire().await.unwrap();
for (i, page) in issue.pages.into_iter().enumerate() { fetch_issue(&issue, output_dir).await?;
debug!( Ok(())
"fetching issue {}, page {}: {page}", });
issue.publication_date, jobs.spawn(job);
i + 1 }
);
let response = reqwest::Client::new().get(page).send().await?; while let Some(res) = jobs.join_next().await {
let mut content = Cursor::new(response.bytes().await?); res???;
let mut page_data = Vec::new();
content.read_to_end(&mut page_data)?;
let tmp_page = tmp_dir.path().join(i.to_string());
fs::write(&tmp_page, page_data)?;
pages.push(tmp_page);
}
let issue_name = format!("nzz_{}.pdf", issue.publication_date);
let issue_path = output_dir.join(issue_name);
let issue_title = format!("NZZ {}", issue.publication_date);
pdf::merge(pages, &issue_path, &issue_title)?;
debug!("issue {} saved", issue.publication_date);
} }
Ok(()) Ok(())

View File

@ -1,12 +1,16 @@
//! Handle information relating to NZZ issues. //! Handle information relating to NZZ issues.
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::Date; use time::Date;
use tokio::{spawn, sync::Semaphore, task::JoinSet};
use tracing::info; use tracing::info;
const SEARCH_URL: &str = "https://zeitungsarchiv.nzz.ch/solr-epaper-search/1.0/search"; const SEARCH_URL: &str = "https://zeitungsarchiv.nzz.ch/solr-epaper-search/1.0/search";
const ISSUE_URL: &str = "https://zeitungsarchiv.nzz.ch/archive/1.0/getPages"; const ISSUE_URL: &str = "https://zeitungsarchiv.nzz.ch/archive/1.0/getPages";
const MAX_DOWNLOADS: usize = 4;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct SearchData { struct SearchData {
@ -173,16 +177,32 @@ async fn build_pages(cookie: &str, edition_id: u32) -> Result<Vec<String>> {
/// Fetch all page urls for `issues`. /// Fetch all page urls for `issues`.
async fn build_issues(cookie: &str, issues: Vec<IssueData>) -> Result<Vec<Issue>> { async fn build_issues(cookie: &str, issues: Vec<IssueData>) -> Result<Vec<Issue>> {
let mut hydrated_issues = Vec::new(); let mut hydrated_issues = Vec::new();
let permits = Arc::new(Semaphore::new(MAX_DOWNLOADS));
let mut jobs = JoinSet::new();
for issue in issues { for issue in issues {
info!( let permits = permits.clone();
"fetching page information for issue {}", let cookie = cookie.to_string();
issue.publication_date
); let job: tokio::task::JoinHandle<Result<Issue>> = spawn(async move {
let pages = build_pages(cookie, issue.edition_id).await?; let _permit = permits.acquire().await.unwrap();
hydrated_issues.push(Issue {
publication_date: issue.publication_date, info!(
pages, "fetching page information for issue {}",
issue.publication_date
);
let pages = build_pages(&cookie, issue.edition_id).await?;
Ok(Issue {
publication_date: issue.publication_date,
pages,
})
}); });
jobs.spawn(job);
}
while let Some(res) = jobs.join_next().await {
let issue: Issue = res???;
hydrated_issues.push(issue);
} }
Ok(hydrated_issues) Ok(hydrated_issues)