Implement SQL transactions

This commit is contained in:
RunasSudo 2024-11-17 22:31:34 +11:00
parent a688ea7c22
commit 30cb94274b
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
8 changed files with 411 additions and 50 deletions

3
src-tauri/Cargo.lock generated
View File

@ -927,14 +927,17 @@ dependencies = [
name = "drcr"
version = "0.1.0"
dependencies = [
"indexmap 2.6.0",
"serde",
"serde_json",
"sqlx",
"tauri",
"tauri-build",
"tauri-plugin-dialog",
"tauri-plugin-shell",
"tauri-plugin-sql",
"tauri-plugin-store",
"tokio",
]
[[package]]

View File

@ -18,11 +18,13 @@ crate-type = ["staticlib", "cdylib", "rlib"]
tauri-build = { version = "2", features = [] }
[dependencies]
tauri = { version = "2", features = [] }
tauri-plugin-shell = "2"
indexmap = { version = "2", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sqlx = { version = "0.8", features = ["json", "time"] }
tauri = { version = "2", features = [] }
tauri-plugin-dialog = "2"
tauri-plugin-shell = "2"
tauri-plugin-sql = { version = "2", features = ["sqlite"] }
tauri-plugin-store = "2"
tokio = { version = "1", features = ["sync"] }

View File

@ -16,32 +16,41 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
mod sql;
use tauri::{AppHandle, Builder, Manager, State};
use tauri_plugin_store::StoreExt;
use tokio::sync::Mutex;
use std::fs;
use std::sync::Mutex;
struct AppState {
db_filename: Option<String>,
sql_transactions: Vec<Option<crate::sql::SqliteTransaction>>,
}
// Filename state
#[tauri::command]
async fn get_open_filename(state: State<'_, Mutex<AppState>>) -> Result<Option<String>, tauri_plugin_sql::Error> {
let state = state.lock().await;
Ok(state.db_filename.clone())
}
#[tauri::command]
fn get_open_filename(state: State<'_, Mutex<AppState>>) -> Option<String> {
let state = state.lock().unwrap();
state.db_filename.clone()
}
#[tauri::command]
fn set_open_filename(state: State<'_, Mutex<AppState>>, app: AppHandle, filename: Option<String>) {
let mut state = state.lock().unwrap();
async fn set_open_filename(state: State<'_, Mutex<AppState>>, app: AppHandle, filename: Option<String>) -> Result<(), tauri_plugin_sql::Error> {
let mut state = state.lock().await;
state.db_filename = filename.clone();
// Persist in store
let store = app.store("store.json").expect("Error opening store");
store.set("db_filename", filename);
Ok(())
}
// Main method
#[cfg_attr(mobile, tauri::mobile_entry_point)]
pub fn run() {
Builder::default()
@ -61,7 +70,8 @@ pub fn run() {
};
app.manage(Mutex::new(AppState {
db_filename: db_filename
db_filename: db_filename,
sql_transactions: Vec::new(),
}));
Ok(())
@ -70,7 +80,10 @@ pub fn run() {
.plugin(tauri_plugin_shell::init())
.plugin(tauri_plugin_sql::Builder::new().build())
.plugin(tauri_plugin_store::Builder::new().build())
.invoke_handler(tauri::generate_handler![get_open_filename, set_open_filename])
.invoke_handler(tauri::generate_handler![
get_open_filename, set_open_filename,
sql::sql_transaction_begin, sql::sql_transaction_execute, sql::sql_transaction_select, sql::sql_transaction_rollback, sql::sql_transaction_commit
])
.run(tauri::generate_context!())
.expect("Error while running tauri application");
}

233
src-tauri/src/sql.rs Normal file
View File

@ -0,0 +1,233 @@
/*
DrCr: Web-based double-entry bookkeeping framework
Copyright (C) 20222024 Lee Yingtong Li (RunasSudo)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use indexmap::IndexMap;
use serde_json::Value as JsonValue;
use sqlx::{Column, Executor, Row, Sqlite, Transaction, TypeInfo, Value, ValueRef};
use sqlx::query::Query;
use sqlx::sqlite::{SqliteArguments, SqliteRow, SqliteValueRef};
use sqlx::types::time::{Date, PrimitiveDateTime, Time};
use tokio::sync::Mutex;
use tauri::State;
use tauri_plugin_sql::{DbInstances, DbPool, Error};
use crate::AppState;
pub type SqliteTransaction = Transaction<'static, Sqlite>;
#[tauri::command]
pub async fn sql_transaction_begin(state: State<'_, Mutex<AppState>>, db_instances: State<'_, DbInstances>, db: String) -> Result<usize, Error> {
let instances = db_instances.0.read().await;
let db = instances.get(&db).ok_or(Error::DatabaseNotLoaded(db))?;
let pool = match db {
DbPool::Sqlite(pool) => pool,
//_ => panic!("Unexpected non-SQLite backend"),
};
// Open transaction
let transaction = pool.begin().await?;
// Store transaction in state
let mut state = state.lock().await;
let available_index = state.sql_transactions.iter().position(|t| t.is_none());
match available_index {
Some(i) => {
state.sql_transactions[i] = Some(transaction);
Ok(i)
}
None => {
state.sql_transactions.push(Some(transaction));
Ok(state.sql_transactions.len() - 1)
}
}
}
#[tauri::command]
pub async fn sql_transaction_execute(state: State<'_, Mutex<AppState>>, transaction_instance_id: usize, query: String, values: Vec<JsonValue>) -> Result<(u64, i64), Error> {
let mut state = state.lock().await;
let transaction =
state.sql_transactions.get_mut(transaction_instance_id)
.expect("Invalid database transaction ID")
.as_mut() // Take reference to transaction rather than moving out of the Vec
.expect("Database transaction ID used after closed");
let query = prepare_query(&query, values);
let result = transaction.execute(query).await?;
Ok((
result.rows_affected(),
result.last_insert_rowid(),
))
}
#[tauri::command]
pub async fn sql_transaction_select(state: State<'_, Mutex<AppState>>, transaction_instance_id: usize, query: String, values: Vec<JsonValue>) -> Result<Vec<IndexMap<String, JsonValue>>, Error> {
let mut state = state.lock().await;
let transaction =
state.sql_transactions.get_mut(transaction_instance_id)
.expect("Invalid database transaction ID")
.as_mut() // Take reference to transaction rather than moving out of the Vec
.expect("Database transaction ID used after closed");
let query = prepare_query(&query, values);
let rows = transaction.fetch_all(query).await?;
rows_to_vec(rows)
}
#[tauri::command]
pub async fn sql_transaction_rollback(state: State<'_, Mutex<AppState>>, transaction_instance_id: usize) -> Result<(), Error> {
let mut state = state.lock().await;
let transaction = state.sql_transactions.get_mut(transaction_instance_id)
.expect("Invalid database transaction ID")
.take() // Remove from Vec
.expect("Database transaction ID used after closed");
transaction.rollback().await?;
Ok(())
}
#[tauri::command]
pub async fn sql_transaction_commit(state: State<'_, Mutex<AppState>>, transaction_instance_id: usize) -> Result<(), Error> {
let mut state = state.lock().await;
let transaction = state.sql_transactions.get_mut(transaction_instance_id)
.expect("Invalid database transaction ID")
.take() // Remove from Vec
.expect("Database transaction ID used after closed");
transaction.commit().await?;
Ok(())
}
fn prepare_query<'a, 'b: 'a>(_query: &'b str, _values: Vec<JsonValue>) -> Query<'b, Sqlite, SqliteArguments<'a>> {
// Copied from tauri_plugin_sql/src/commands.rs
// Copyright 2019-2023 Tauri Programme within The Commons Conservancy
// Licensed under MIT/Apache 2.0
let mut query = sqlx::query(_query);
for value in _values {
if value.is_null() {
query = query.bind(None::<JsonValue>);
} else if value.is_string() {
query = query.bind(value.as_str().unwrap().to_owned())
} else if let Some(number) = value.as_number() {
query = query.bind(number.as_f64().unwrap_or_default())
} else {
query = query.bind(value);
}
}
query
}
fn rows_to_vec(rows: Vec<SqliteRow>) -> Result<Vec<IndexMap<String, JsonValue>>, Error> {
// Copied from tauri_plugin_sql/src/commands.rs
// Copyright 2019-2023 Tauri Programme within The Commons Conservancy
// Licensed under MIT/Apache 2.0
let mut values = Vec::new();
for row in rows {
let mut value = IndexMap::default();
for (i, column) in row.columns().iter().enumerate() {
let v = row.try_get_raw(i)?;
let v = decode_sqlite_to_json(v)?;
value.insert(column.name().to_string(), v);
}
values.push(value);
}
Ok(values)
}
fn decode_sqlite_to_json(v: SqliteValueRef) -> Result<JsonValue, Error> {
// Copied from tauri_plugin_sql/src/decode/sqlite.rs
// Copyright 2019-2023 Tauri Programme within The Commons Conservancy
// Licensed under MIT/Apache 2.0
// Same as tauri_plugin_sql::decode::sqlite::to_json but that function is not exposed
if v.is_null() {
return Ok(JsonValue::Null);
}
let res = match v.type_info().name() {
"TEXT" => {
if let Ok(v) = v.to_owned().try_decode() {
JsonValue::String(v)
} else {
JsonValue::Null
}
}
"REAL" => {
if let Ok(v) = v.to_owned().try_decode::<f64>() {
JsonValue::from(v)
} else {
JsonValue::Null
}
}
"INTEGER" | "NUMERIC" => {
if let Ok(v) = v.to_owned().try_decode::<i64>() {
JsonValue::Number(v.into())
} else {
JsonValue::Null
}
}
"BOOLEAN" => {
if let Ok(v) = v.to_owned().try_decode() {
JsonValue::Bool(v)
} else {
JsonValue::Null
}
}
"DATE" => {
if let Ok(v) = v.to_owned().try_decode::<Date>() {
JsonValue::String(v.to_string())
} else {
JsonValue::Null
}
}
"TIME" => {
if let Ok(v) = v.to_owned().try_decode::<Time>() {
JsonValue::String(v.to_string())
} else {
JsonValue::Null
}
}
"DATETIME" => {
if let Ok(v) = v.to_owned().try_decode::<PrimitiveDateTime>() {
JsonValue::String(v.to_string())
} else {
JsonValue::Null
}
}
"BLOB" => {
if let Ok(v) = v.to_owned().try_decode::<Vec<u8>>() {
JsonValue::Array(v.into_iter().map(|n| JsonValue::Number(n.into())).collect())
} else {
JsonValue::Null
}
}
"NULL" => JsonValue::Null,
_ => return Err(Error::UnsupportedDatatype(v.type_info().name().to_string())),
};
Ok(res)
}

View File

@ -201,12 +201,12 @@
}
}
// Save changes to database
// FIXME: Use transactions
// Save changes to database atomically
const dbTransaction = await session.begin();
if (newTransaction.id === null) {
// Insert new transaction
const result = await session.execute(
const result = await dbTransaction.execute(
`INSERT INTO transactions (dt, description)
VALUES ($1, $2)`,
[newTransaction.dt, newTransaction.description]
@ -214,7 +214,7 @@
newTransaction.id = result.lastInsertId;
} else {
// Update existing transaction
await session.execute(
await dbTransaction.execute(
`UPDATE transactions
SET dt = $1, description = $2
WHERE id = $3`,
@ -233,7 +233,7 @@
if (insertPostings) {
// Delete existing posting if required
if (posting.id !== null) {
await session.execute(
await dbTransaction.execute(
`DELETE FROM postings
WHERE id = $1`,
[posting.id]
@ -241,7 +241,7 @@
}
// Insert new posting
const result = await session.execute(
const result = await dbTransaction.execute(
`INSERT INTO postings (transaction_id, description, account, quantity, commodity, running_balance)
VALUES ($1, $2, $3, $4, $5, NULL)`,
[newTransaction.id, posting.description, posting.account, posting.quantity, posting.commodity]
@ -250,7 +250,7 @@
// Fixup reconciliation if required
const joinedReconciliation = postingsToReconciliations.get(posting);
if (joinedReconciliation) {
await session.execute(
await dbTransaction.execute(
`UPDATE statement_line_reconciliations
SET posting_id = $1
WHERE id = $2`,
@ -259,7 +259,7 @@
}
} else {
// Update existing posting
await session.execute(
await dbTransaction.execute(
`UPDATE postings
SET description = $1, account = $2, quantity = $3, commodity = $4
WHERE id = $5`,
@ -268,7 +268,7 @@
}
// Invalidate running balances
await session.execute(
await dbTransaction.execute(
`UPDATE postings
SET running_balance = NULL
FROM (
@ -282,6 +282,8 @@
);
}
await dbTransaction.commit();
await getCurrentWindow().close();
}
@ -290,30 +292,35 @@
return;
}
const session = await db.load();
// Delete atomically
await session.execute(
`BEGIN;
const session = await db.load();
const dbTransaction = await session.begin();
-- Cascade delete statement line reconciliations
DELETE FROM statement_line_reconciliations
// Cascade delete statement line reconciliations
await dbTransaction.execute(
`DELETE FROM statement_line_reconciliations
WHERE posting_id IN (
SELECT postings.id FROM postings WHERE transaction_id = $1
);
-- Delete postings
DELETE FROM postings
WHERE transaction_id = $1;
-- Delete transaction
DELETE FROM transactions
WHERE id = $1;
COMMIT;`,
)`,
[transaction.id]
);
// Delete postings
await dbTransaction.execute(
`DELETE FROM postings
WHERE transaction_id = $1`,
[transaction.id]
)
// Delete transaction
await dbTransaction.execute(
`DELETE FROM transactions
WHERE id = $1`,
[transaction.id]
)
await dbTransaction.commit();
await getCurrentWindow().close();
}
</script>

View File

@ -23,6 +23,7 @@ import Database from '@tauri-apps/plugin-sql';
import { reactive } from 'vue';
import { asCost, Balance } from './amounts.ts';
import { ExtendedDatabase } from './dbutil.ts';
export const db = reactive({
filename: null as (string | null),
@ -35,7 +36,7 @@ export const db = reactive({
dps: null! as number,
},
init: async function(filename: string) {
init: async function(filename: string): Promise<void> {
// Set the DB filename and initialise cached data
this.filename = filename;
@ -52,13 +53,13 @@ export const db = reactive({
this.metadata.dps = parseInt(metadataObject.amount_dps);
},
load: async function() {
return await Database.load('sqlite:' + this.filename);
load: async function(): Promise<ExtendedDatabase> {
return new ExtendedDatabase(await Database.load('sqlite:' + this.filename));
},
});
export async function totalBalances(session: Database): Promise<{account: string, quantity: number}[]> {
await updateRunningBalances();
export async function totalBalances(session: ExtendedDatabase): Promise<{account: string, quantity: number}[]> {
await updateRunningBalances(session);
return await session.select(`
SELECT p3.account AS account, running_balance AS quantity FROM
@ -73,11 +74,10 @@ export async function totalBalances(session: Database): Promise<{account: string
`);
}
export async function updateRunningBalances() {
export async function updateRunningBalances(session: ExtendedDatabase) {
// TODO: This is very slow - it would be faster to do this in Rust
// Recompute any required running balances
const session = await db.load();
const staleAccountsRaw: {account: string}[] = await session.select('SELECT DISTINCT account FROM postings WHERE running_balance IS NULL');
const staleAccounts: string[] = staleAccountsRaw.map((x) => x.account);
@ -97,6 +97,9 @@ export async function updateRunningBalances() {
staleAccounts
);
// Update running balances atomically
const dbTransaction = await session.begin();
const runningBalances = new Map();
for (const posting of joinedTransactionPostings) {
const openingBalance = runningBalances.get(posting.account) ?? 0;
@ -108,7 +111,7 @@ export async function updateRunningBalances() {
// Update running balance of posting
// Only perform this update if required, to avoid expensive call to DB
if (posting.running_balance !== runningBalance) {
await session.execute(
await dbTransaction.execute(
`UPDATE postings
SET running_balance = $1
WHERE id = $2`,
@ -116,6 +119,8 @@ export async function updateRunningBalances() {
);
}
}
await dbTransaction.commit();
}
export function joinedToTransactions(joinedTransactionPostings: JoinedTransactionPosting[]): Transaction[] {

98
src/dbutil.ts Normal file
View File

@ -0,0 +1,98 @@
/*
DrCr: Web-based double-entry bookkeeping framework
Copyright (C) 20222024 Lee Yingtong Li (RunasSudo)
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { invoke } from '@tauri-apps/api/core';
import Database, { QueryResult } from '@tauri-apps/plugin-sql';
export class ExtendedDatabase {
db: Database;
constructor(db: Database) {
this.db = db;
}
async execute(query: string, bindValues?: unknown[]): Promise<QueryResult> {
return await this.db.execute(query, bindValues);
}
async select<T>(query: string, bindValues?: unknown[]): Promise<T> {
return await this.db.select(query, bindValues);
}
async begin(): Promise<DatabaseTransaction> {
const transactionInstanceId: number = await invoke('sql_transaction_begin', {
db: this.db.path
});
const db_transaction = new DatabaseTransaction(this, transactionInstanceId);
registry.register(db_transaction, transactionInstanceId, db_transaction); // Remember to rollback and close connection on finalization
return db_transaction;
}
}
export class DatabaseTransaction {
db: ExtendedDatabase;
transactionInstanceId: number;
constructor(db: ExtendedDatabase, transactionInstanceId: number) {
this.db = db;
this.transactionInstanceId = transactionInstanceId;
}
async execute(query: string, bindValues?: unknown[]): Promise<QueryResult> {
const [rowsAffected, lastInsertId] = await invoke('sql_transaction_execute', {
transactionInstanceId: this.transactionInstanceId,
query,
values: bindValues ?? []
}) as [number, number];
return {
lastInsertId: lastInsertId,
rowsAffected: rowsAffected
};
}
async select<T>(query: string, bindValues?: unknown[]): Promise<T> {
const result: T = await invoke('sql_transaction_select', {
transactionInstanceId: this.transactionInstanceId,
query,
values: bindValues ?? []
});
return result;
}
async rollback(): Promise<void> {
registry.unregister(this);
await invoke('sql_transaction_rollback', {
transactionInstanceId: this.transactionInstanceId
});
}
async commit(): Promise<void> {
registry.unregister(this);
await invoke('sql_transaction_commit', {
transactionInstanceId: this.transactionInstanceId
});
}
}
const registry = new FinalizationRegistry(async (transactionInstanceId) => {
// Remember to rollback and close connection on finalization
await invoke('sql_transaction_rollback', {
transactionInstanceId: transactionInstanceId
});
});

View File

@ -54,7 +54,7 @@
const session = await db.load();
// Ensure running balances are up to date because we use these
await updateRunningBalances();
await updateRunningBalances(session);
const joinedTransactionPostings: JoinedTransactionPosting[] = await session.select(
`SELECT transaction_id, dt, transactions.description AS transaction_description, postings.id, postings.description, account, quantity, commodity, running_balance