Execute reporting steps in parallel

This commit is contained in:
RunasSudo 2025-05-27 17:28:34 +10:00
parent aa5238917e
commit 280a2090d9
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
4 changed files with 71 additions and 17 deletions

View File

@ -16,6 +16,8 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use chrono::NaiveDate;
use libdrcr::db::DbConnection;
use libdrcr::reporting::builders::register_dynamic_builders;
@ -41,10 +43,11 @@ async fn main() {
NaiveDate::from_ymd_opt(2025, 6, 30).unwrap(),
"$".to_string(),
);
register_lookup_fns(&mut context);
register_dynamic_builders(&mut context);
let context = Arc::new(context);
// Print Graphviz
let targets = vec![
@ -86,7 +89,9 @@ async fn main() {
},
];
let products = generate_report(targets, &context).await.unwrap();
let products = generate_report(targets, Arc::clone(&context))
.await
.unwrap();
let result = products
.get_or_err(&ReportingProductId {
name: "AllTransactionsExceptEarningsToEquity",
@ -120,7 +125,9 @@ async fn main() {
},
];
let products = generate_report(targets, &context).await.unwrap();
let products = generate_report(targets, Arc::clone(&context))
.await
.unwrap();
let result = products
.get_or_err(&ReportingProductId {
name: "BalanceSheet",

View File

@ -179,7 +179,7 @@ fn build_step_for_product(
}
/// Check whether the [ReportingStep] would be ready to execute, if the given previous steps have already completed
fn would_be_ready_to_execute(
pub(crate) fn would_be_ready_to_execute(
step: &Box<dyn ReportingStep>,
steps: &Vec<Box<dyn ReportingStep>>,
dependencies: &ReportingGraphDependencies,

View File

@ -16,10 +16,12 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use tokio::sync::RwLock;
use std::sync::Arc;
use tokio::{sync::RwLock, task::JoinSet};
use super::{
calculator::ReportingGraphDependencies,
calculator::{would_be_ready_to_execute, ReportingGraphDependencies},
types::{ReportingContext, ReportingProducts, ReportingStep},
};
@ -28,19 +30,62 @@ pub enum ReportingExecutionError {
DependencyNotAvailable { message: String },
}
async fn execute_step(
step_idx: usize,
steps: Arc<Vec<Box<dyn ReportingStep>>>,
dependencies: Arc<ReportingGraphDependencies>,
context: Arc<ReportingContext>,
products: Arc<RwLock<ReportingProducts>>,
) -> (usize, Result<ReportingProducts, ReportingExecutionError>) {
let step = &steps[step_idx];
let result = step
.execute(&*context, &*steps, &*dependencies, &*products)
.await;
(step_idx, result)
}
pub async fn execute_steps(
steps: Vec<Box<dyn ReportingStep>>,
dependencies: ReportingGraphDependencies,
context: &ReportingContext,
context: Arc<ReportingContext>,
) -> Result<ReportingProducts, ReportingExecutionError> {
let products = RwLock::new(ReportingProducts::new());
let products = Arc::new(RwLock::new(ReportingProducts::new()));
for step in steps.iter() {
// Execute the step
// TODO: Do this in parallel
let mut new_products = step
.execute(context, &steps, &dependencies, &products)
.await?;
// Prepare for async
let steps = Arc::new(steps);
let dependencies = Arc::new(dependencies);
// Execute steps asynchronously
let mut handles = JoinSet::new();
let mut steps_done = Vec::new();
let mut steps_remaining = (0..steps.len()).collect::<Vec<_>>();
while steps_done.len() != steps.len() {
// Execute each step which is ready to run
for step_idx in steps_remaining.iter().copied().collect::<Vec<_>>() {
// Check if ready to run
if would_be_ready_to_execute(&steps[step_idx], &steps, &dependencies, &steps_done) {
// Spawn new task
// Unfortunately the compiler cannot guarantee lifetimes are correct, so we must pass Arc across thread boundaries
handles.spawn(execute_step(
step_idx,
Arc::clone(&steps),
Arc::clone(&dependencies),
Arc::clone(&context),
Arc::clone(&products),
));
steps_remaining
.remove(steps_remaining.iter().position(|i| *i == step_idx).unwrap());
}
}
// Join next result
let (step_idx, result) = handles.join_next().await.unwrap().unwrap();
let step = &steps[step_idx];
steps_done.push(step_idx);
let mut new_products = result?;
// Sanity check the new products
for (product_id, _product) in new_products.map().iter() {
@ -71,5 +116,5 @@ pub async fn execute_steps(
products.write().await.append(&mut new_products);
}
Ok(products.into_inner())
Ok(Arc::into_inner(products).unwrap().into_inner())
}

View File

@ -16,6 +16,8 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::sync::Arc;
use calculator::{steps_for_targets, ReportingCalculationError};
use executor::{execute_steps, ReportingExecutionError};
use types::{ReportingContext, ReportingProductId, ReportingProducts};
@ -50,10 +52,10 @@ impl From<ReportingExecutionError> for ReportingError {
/// Helper function to call [steps_for_targets] followed by [execute_steps].
pub async fn generate_report(
targets: Vec<ReportingProductId>,
context: &ReportingContext,
context: Arc<ReportingContext>,
) -> Result<ReportingProducts, ReportingError> {
// Solve dependencies
let (sorted_steps, dependencies) = steps_for_targets(targets, context)?;
let (sorted_steps, dependencies) = steps_for_targets(targets, &*context)?;
// Execute steps
let products = execute_steps(sorted_steps, dependencies, context).await?;