lychee/examples/client_pool/client_pool.rs

49 lines
1.4 KiB
Rust
Raw Permalink Normal View History

use lychee_lib::{ClientBuilder, Request, Result};
2021-04-30 11:33:24 +00:00
use std::convert::TryFrom;
use tokio::sync::mpsc;
Improve concurrency with streams (#330) * Move to from vec to streams Previously we collected all inputs in one vector before checking the links, which is not ideal. Especially when reading many inputs (e.g. by using a glob pattern), this could cause issues like running out of file handles. By moving to streams we avoid that scenario. This is also the first step towards improving performance for many inputs. To stay as close to the pre-stream behaviour, we want to stop processing as soon as an Err value appears in the stream. This is easiest when the stream is consumed in the main thread. Previously, the stream was consumed in a tokio task and the main thread waited for responses. Now, a tokio task waits for responses (and displays them/registers response stats) and the main thread sends links to the ClientPool. To ensure that the main thread waits for all responses to have arrived before finishing the ProgressBar and printing the stats, it waits for the show_results_task to finish. * Return collected links as Stream * Initialize ProgressBar without length because we can't know the amount of links without blocking * Handle stream results in main thread, not in task * Add basic directory support using jwalk * Add test for HTTP protocol file type (http://) * Remove deadpool (once again): Replaced with `futures::StreamExt::for_each_concurrent`. * Refactor main; fix tests * Move commands into separate submodule * Simplify input handling * Simplify collector * Remove unnecessary unwrap * Simplify main * cleanup check * clean up dump command * Handle requests in parallel * Fix formatting and lints Co-authored-by: Timo Freiberg <self@timofreiberg.com>
2021-12-01 17:25:11 +00:00
use tokio_stream::wrappers::ReceiverStream;
2021-04-30 11:33:24 +00:00
const CONCURRENT_REQUESTS: usize = 4;
#[tokio::main]
async fn main() -> Result<()> {
// These channels are used to send requests and receive responses to and
Improve concurrency with streams (#330) * Move to from vec to streams Previously we collected all inputs in one vector before checking the links, which is not ideal. Especially when reading many inputs (e.g. by using a glob pattern), this could cause issues like running out of file handles. By moving to streams we avoid that scenario. This is also the first step towards improving performance for many inputs. To stay as close to the pre-stream behaviour, we want to stop processing as soon as an Err value appears in the stream. This is easiest when the stream is consumed in the main thread. Previously, the stream was consumed in a tokio task and the main thread waited for responses. Now, a tokio task waits for responses (and displays them/registers response stats) and the main thread sends links to the ClientPool. To ensure that the main thread waits for all responses to have arrived before finishing the ProgressBar and printing the stats, it waits for the show_results_task to finish. * Return collected links as Stream * Initialize ProgressBar without length because we can't know the amount of links without blocking * Handle stream results in main thread, not in task * Add basic directory support using jwalk * Add test for HTTP protocol file type (http://) * Remove deadpool (once again): Replaced with `futures::StreamExt::for_each_concurrent`. * Refactor main; fix tests * Move commands into separate submodule * Simplify input handling * Simplify collector * Remove unnecessary unwrap * Simplify main * cleanup check * clean up dump command * Handle requests in parallel * Fix formatting and lints Co-authored-by: Timo Freiberg <self@timofreiberg.com>
2021-12-01 17:25:11 +00:00
// from lychee
2021-10-08 11:08:44 +00:00
let (send_req, recv_req) = mpsc::channel(CONCURRENT_REQUESTS);
2021-04-30 11:33:24 +00:00
let (send_resp, mut recv_resp) = mpsc::channel(CONCURRENT_REQUESTS);
// Add as many requests as you like
let requests = vec![Request::try_from("https://example.com")?];
2021-04-30 11:33:24 +00:00
Improve concurrency with streams (#330) * Move to from vec to streams Previously we collected all inputs in one vector before checking the links, which is not ideal. Especially when reading many inputs (e.g. by using a glob pattern), this could cause issues like running out of file handles. By moving to streams we avoid that scenario. This is also the first step towards improving performance for many inputs. To stay as close to the pre-stream behaviour, we want to stop processing as soon as an Err value appears in the stream. This is easiest when the stream is consumed in the main thread. Previously, the stream was consumed in a tokio task and the main thread waited for responses. Now, a tokio task waits for responses (and displays them/registers response stats) and the main thread sends links to the ClientPool. To ensure that the main thread waits for all responses to have arrived before finishing the ProgressBar and printing the stats, it waits for the show_results_task to finish. * Return collected links as Stream * Initialize ProgressBar without length because we can't know the amount of links without blocking * Handle stream results in main thread, not in task * Add basic directory support using jwalk * Add test for HTTP protocol file type (http://) * Remove deadpool (once again): Replaced with `futures::StreamExt::for_each_concurrent`. * Refactor main; fix tests * Move commands into separate submodule * Simplify input handling * Simplify collector * Remove unnecessary unwrap * Simplify main * cleanup check * clean up dump command * Handle requests in parallel * Fix formatting and lints Co-authored-by: Timo Freiberg <self@timofreiberg.com>
2021-12-01 17:25:11 +00:00
// Queue requests
2021-04-30 11:33:24 +00:00
tokio::spawn(async move {
for request in requests {
send_req.send(request).await.unwrap();
}
});
// Create a default lychee client
let client = ClientBuilder::default().client()?;
Improve concurrency with streams (#330) * Move to from vec to streams Previously we collected all inputs in one vector before checking the links, which is not ideal. Especially when reading many inputs (e.g. by using a glob pattern), this could cause issues like running out of file handles. By moving to streams we avoid that scenario. This is also the first step towards improving performance for many inputs. To stay as close to the pre-stream behaviour, we want to stop processing as soon as an Err value appears in the stream. This is easiest when the stream is consumed in the main thread. Previously, the stream was consumed in a tokio task and the main thread waited for responses. Now, a tokio task waits for responses (and displays them/registers response stats) and the main thread sends links to the ClientPool. To ensure that the main thread waits for all responses to have arrived before finishing the ProgressBar and printing the stats, it waits for the show_results_task to finish. * Return collected links as Stream * Initialize ProgressBar without length because we can't know the amount of links without blocking * Handle stream results in main thread, not in task * Add basic directory support using jwalk * Add test for HTTP protocol file type (http://) * Remove deadpool (once again): Replaced with `futures::StreamExt::for_each_concurrent`. * Refactor main; fix tests * Move commands into separate submodule * Simplify input handling * Simplify collector * Remove unnecessary unwrap * Simplify main * cleanup check * clean up dump command * Handle requests in parallel * Fix formatting and lints Co-authored-by: Timo Freiberg <self@timofreiberg.com>
2021-12-01 17:25:11 +00:00
// Start receiving requests
// Requests get streamed into the client and run concurrently
2021-04-30 11:33:24 +00:00
tokio::spawn(async move {
Improve concurrency with streams (#330) * Move to from vec to streams Previously we collected all inputs in one vector before checking the links, which is not ideal. Especially when reading many inputs (e.g. by using a glob pattern), this could cause issues like running out of file handles. By moving to streams we avoid that scenario. This is also the first step towards improving performance for many inputs. To stay as close to the pre-stream behaviour, we want to stop processing as soon as an Err value appears in the stream. This is easiest when the stream is consumed in the main thread. Previously, the stream was consumed in a tokio task and the main thread waited for responses. Now, a tokio task waits for responses (and displays them/registers response stats) and the main thread sends links to the ClientPool. To ensure that the main thread waits for all responses to have arrived before finishing the ProgressBar and printing the stats, it waits for the show_results_task to finish. * Return collected links as Stream * Initialize ProgressBar without length because we can't know the amount of links without blocking * Handle stream results in main thread, not in task * Add basic directory support using jwalk * Add test for HTTP protocol file type (http://) * Remove deadpool (once again): Replaced with `futures::StreamExt::for_each_concurrent`. * Refactor main; fix tests * Move commands into separate submodule * Simplify input handling * Simplify collector * Remove unnecessary unwrap * Simplify main * cleanup check * clean up dump command * Handle requests in parallel * Fix formatting and lints Co-authored-by: Timo Freiberg <self@timofreiberg.com>
2021-12-01 17:25:11 +00:00
futures::StreamExt::for_each_concurrent(
ReceiverStream::new(recv_req),
CONCURRENT_REQUESTS,
|req| async {
let resp = client.check(req).await.unwrap();
send_resp.send(resp).await.unwrap();
},
)
.await;
2021-04-30 11:33:24 +00:00
});
// Finally, listen to incoming responses from lychee
while let Some(response) = recv_resp.recv().await {
println!("{response}");
2021-04-30 11:33:24 +00:00
}
Ok(())
}