Concurrent archives (#1027)

This commit is contained in:
Thomas Zahner 2023-05-11 20:20:27 +02:00 committed by GitHub
parent 4b45f9de89
commit 130fa21a6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 36 deletions

3
fixtures/INTERNET_ARCHIVE.md vendored Normal file
View file

@ -0,0 +1,3 @@
Testing web archives and the `--suggest` feature.
[A link that used to work in the past](https://www.google.com/jobs.html)

View file

@ -1,18 +1,18 @@
use std::collections::HashSet;
use std::io::{self, Write};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use futures::StreamExt;
use indicatif::ProgressBar;
use indicatif::ProgressStyle;
use reqwest::Url;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use lychee_lib::Status;
use lychee_lib::{Client, Request, Response};
use lychee_lib::{InputSource, Result};
use lychee_lib::{ResponseBody, Status};
use crate::archive::{Archive, Suggestion};
use crate::formatters::response::ResponseFormatter;
@ -91,6 +91,7 @@ where
params.cfg.archive.unwrap_or_default(),
&mut stats,
!params.cfg.no_progress,
max_concurrency,
)
.await;
}
@ -103,23 +104,13 @@ where
Ok((stats, cache_ref, code))
}
async fn suggest_archived_links(archive: Archive, stats: &mut ResponseStats, show_progress: bool) {
let failed_urls = &stats
.fail_map
.iter()
.flat_map(|(source, set)| set.iter().map(|entry| (source, entry)).collect::<Vec<_>>())
.filter(|(_, response)| {
let uri = &response.uri;
!(uri.is_data() || uri.is_mail() || uri.is_file())
})
.filter_map(
|(source, response)| match response.uri.as_str().try_into() {
Ok(url) => Some((source, url)),
Err(_) => None,
},
)
.collect::<Vec<(&InputSource, Url)>>();
async fn suggest_archived_links(
archive: Archive,
stats: &mut ResponseStats,
show_progress: bool,
max_concurrency: usize,
) {
let failed_urls = &get_failed_urls(stats);
let bar = if show_progress {
let bar = init_progress_bar("Searching for alternatives");
bar.set_length(failed_urls.len() as u64);
@ -128,22 +119,28 @@ async fn suggest_archived_links(archive: Archive, stats: &mut ResponseStats, sho
None
};
for (input, url) in failed_urls {
if let Ok(Some(suggestion)) = archive.get_link(url).await {
stats
.suggestion_map
.entry((*input).clone())
.or_default()
.insert(Suggestion {
suggestion,
original: url.clone(),
});
}
let suggestions = Mutex::new(&mut stats.suggestion_map);
if let Some(bar) = &bar {
bar.inc(1);
}
}
futures::stream::iter(failed_urls)
.map(|(input, url)| (input, url, archive.get_link(url)))
.for_each_concurrent(max_concurrency, |(input, url, future)| async {
if let Ok(Some(suggestion)) = future.await {
suggestions
.lock()
.unwrap()
.entry(input.clone())
.or_default()
.insert(Suggestion {
suggestion,
original: url.clone(),
});
}
if let Some(bar) = &bar {
bar.inc(1);
}
})
.await;
if let Some(bar) = &bar {
bar.finish_with_message("Finished searching for alternatives");
@ -213,7 +210,7 @@ async fn request_channel_task(
cache: Arc<Cache>,
accept: Option<HashSet<u16>>,
) {
futures::StreamExt::for_each_concurrent(
StreamExt::for_each_concurrent(
ReceiverStream::new(recv_req),
max_concurrency,
|request: Result<Request>| async {
@ -295,6 +292,27 @@ fn show_progress(
Ok(())
}
fn get_failed_urls(stats: &mut ResponseStats) -> Vec<(InputSource, Url)> {
stats
.fail_map
.iter()
.flat_map(|(source, set)| {
set.iter()
.map(move |ResponseBody { uri, status: _ }| (source, uri))
})
.filter_map(|(source, uri)| {
if uri.is_data() || uri.is_mail() || uri.is_file() {
None
} else {
match Url::try_from(uri.as_str()) {
Ok(url) => Some((source.clone(), url)),
Err(_) => None,
}
}
})
.collect()
}
#[cfg(test)]
mod tests {
use log::info;

View file

@ -1149,4 +1149,20 @@ mod cli {
Ok(())
}
#[test]
fn test_suggests_url_alternatives() -> Result<()> {
let mut cmd = main_command();
let input = fixtures_path().join("INTERNET_ARCHIVE.md");
cmd.arg("--suggest")
.arg(input)
.assert()
.failure()
.code(2)
.stdout(contains("Suggestions"))
.stdout(contains("http://web.archive.org/web/"));
Ok(())
}
}