DrCr/drcr/models.py
RunasSudo 18392d6917
Basic caching of running balances
About 60% performance improvement on balance report
2024-11-09 23:38:56 +11:00

344 lines
12 KiB
Python

# DrCr: Web-based double-entry bookkeeping framework
# Copyright (C) 2022–2024 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from markupsafe import Markup
from . import AMOUNT_DPS
from .database import db
import functools
from itertools import groupby
class Transaction(db.Model):
__tablename__ = 'transactions'
id = db.Column(db.Integer, primary_key=True)
dt = db.Column(db.DateTime)
description = db.Column(db.String)
postings = db.relationship('Posting', back_populates='transaction', cascade='all, delete-orphan')
def __init__(self, dt=None, description=None, postings=None):
self.dt = dt
self.description = description
self.postings = postings or []
def assert_valid(self):
"""Assert that debits equal credits"""
total_dr = 0
total_cr = 0
for posting in self.postings:
amount_cost = posting.amount().as_cost().quantity
if amount_cost > 0:
total_dr += amount_cost
elif amount_cost < 0:
total_cr -= amount_cost
if total_dr != total_cr:
raise AssertionError('Transaction debits ({}) and credits ({}) do not balance'.format(total_dr, total_cr))
class Posting(db.Model):
__tablename__ = 'postings'
id = db.Column(db.Integer, primary_key=True)
transaction_id = db.Column(db.Integer, db.ForeignKey('transactions.id'))
description = db.Column(db.String)
account = db.Column(db.String)
quantity = db.Column(db.Integer)
commodity = db.Column(db.String)
# Running balance of the account in units of reporting_commodity
# Only takes into consideration Transactions stored in database, not API-generated ones
running_balance = db.Column(db.Integer)
transaction = db.relationship('Transaction', back_populates='postings')
def __init__(self, description=None, account=None, quantity=None, commodity=None):
self.description = description
self.account = account
self.quantity = quantity
self.commodity = commodity
def amount(self):
return Amount(self.quantity, self.commodity)
class Amount:
__slots__ = ['quantity', 'commodity']
def __init__(self, quantity, commodity):
self.quantity = quantity
self.commodity = commodity
@classmethod
def parse(self, amount_str):
if ' ' not in amount_str:
# Default commodity
quantity = round(float(amount_str) * (10**AMOUNT_DPS))
return Amount(quantity, reporting_commodity())
quantity_str = amount_str[:amount_str.index(' ')]
quantity = round(float(quantity_str) * (10**AMOUNT_DPS))
commodity = amount_str[amount_str.index(' ')+1:]
return Amount(quantity, commodity)
def __repr__(self):
return '<{}: {}>'.format(self.__class__.__name__, self.format('force'))
def __abs__(self):
return Amount(abs(self.quantity), self.commodity)
def __neg__(self):
return Amount(-self.quantity, self.commodity)
def __add__(self, other):
if self.commodity != other.commodity:
raise ValueError('Cannot add incompatible commodities {} and {}'.format(self.commodity, other.commodity))
return Amount(self.quantity + other.quantity, self.commodity)
def __sub__(self, other):
return self + (-other)
def clone(self):
return Amount(self.quantity, self.commodity)
def format(self, commodity='non_reporting'):
if commodity not in ('non_reporting', 'force', 'hide'):
raise ValueError('Invalid commodity reporting option')
if (self.commodity == reporting_commodity() and commodity in ('non_reporting', 'force')) or commodity == 'hide':
return Markup('{:,.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', '&#x202F;'))
elif len(self.commodity) == 1:
return Markup('{0}{1:,.{dps}f}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', '&#x202F;'))
else:
return Markup('{1:,.{dps}f} {0}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', '&#x202F;'))
def format_accounting(self, link=None):
if self.quantity >= 0:
text = '{:,.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', '&#x202F;')
space = '&nbsp;'
else:
text = '({:,.{dps}f})'.format(-self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', '&#x202F;')
space = ''
if link is None:
return Markup(text + space)
else:
return Markup('<a href="{}" class="hover:text-blue-700 hover:underline">{}</a>{}'.format(link, text, space))
def quantity_string(self):
if self.commodity == reporting_commodity():
return '{:.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS)
elif len(self.commodity) == 1:
return '{0}{1:.{dps}f}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS)
else:
return '{1:.{dps}f} {0}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS)
def as_cost(self):
"""Convert commodity to reporting currency in cost basis"""
if self.commodity == reporting_commodity():
return self
# TODO: Refactor this
if '{{' in self.commodity:
cost = float(self.commodity[self.commodity.index('{{')+2:self.commodity.index('}}')])
if self.quantity < 0:
cost = -cost
return Amount(round(cost * (10**AMOUNT_DPS)), reporting_commodity())
elif '{' in self.commodity:
cost = float(self.commodity[self.commodity.index('{')+1:self.commodity.index('}')])
return Amount(round(cost * self.quantity), reporting_commodity()) # FIXME: Custom reporting currency
else:
raise Exception('No cost base for commodity {}'.format(self.commodity))
class Balance:
"""A collection of Amount's"""
def __init__(self):
self.amounts = []
def clone(self):
balance = Balance()
balance.amounts = [a.clone() for a in self.amounts]
return balance
def add(self, rhs):
amount = next((a for a in self.amounts if a.commodity == rhs.commodity), None)
if amount is None:
self.amounts.append(rhs)
else:
amount.quantity += rhs.quantity
def clean(self):
"""Remove zero amounts"""
self.amounts = [a for a in self.amounts if a.quantity != 0]
class TrialBalancer:
"""
Applies transactions to generate a trial balance
"""
def __init__(self):
self.accounts = {}
@classmethod
def from_cached(cls, start_date=None, end_date=None):
"""Obtain a TrialBalancer based on the cached running_balance"""
if start_date is not None:
result_start_date = cls()
# First SELECT the last applicable dt by account
# Then, among the transactions with that dt, SELECT the last applicable transaction_id
# Then extract the running_balance for each account at that transaction_id
running_balances = db.session.execute('''
SELECT p3.account, running_balance FROM
(
SELECT p1.account, max(p2.transaction_id) AS max_tid FROM
(
SELECT account, max(dt) AS max_dt FROM postings JOIN transactions ON postings.transaction_id = transactions.id WHERE dt < :start_date GROUP BY account
) p1
JOIN postings p2 ON p1.account = p2.account AND p1.max_dt = transactions.dt JOIN transactions ON p2.transaction_id = transactions.id GROUP BY p2.account
) p3
JOIN postings p4 ON p3.account = p4.account AND p3.max_tid = p4.transaction_id
''', {'start_date': start_date})
for running_balance in running_balances.all():
result_start_date.accounts[running_balance.account] = Amount(running_balance.running_balance, reporting_commodity())
if end_date is None:
result = cls()
running_balances = db.session.execute('''
SELECT p3.account, running_balance FROM
(
SELECT p1.account, max(p2.transaction_id) AS max_tid FROM
(
SELECT account, max(dt) AS max_dt FROM postings JOIN transactions ON postings.transaction_id = transactions.id GROUP BY account
) p1
JOIN postings p2 ON p1.account = p2.account AND p1.max_dt = transactions.dt JOIN transactions ON p2.transaction_id = transactions.id GROUP BY p2.account
) p3
JOIN postings p4 ON p3.account = p4.account AND p3.max_tid = p4.transaction_id
''')
for running_balance in running_balances.all():
result.accounts[running_balance.account] = Amount(running_balance.running_balance, reporting_commodity())
if end_date is not None:
result = cls()
running_balances = db.session.execute('''
SELECT p3.account, running_balance FROM
(
SELECT p1.account, max(p2.transaction_id) AS max_tid FROM
(
SELECT account, max(dt) AS max_dt FROM postings JOIN transactions ON postings.transaction_id = transactions.id WHERE dt <= :end_date GROUP BY account
) p1
JOIN postings p2 ON p1.account = p2.account AND p1.max_dt = transactions.dt JOIN transactions ON p2.transaction_id = transactions.id GROUP BY p2.account
) p3
JOIN postings p4 ON p3.account = p4.account AND p3.max_tid = p4.transaction_id
''', {'end_date': end_date})
for running_balance in running_balances.all():
result.accounts[running_balance.account] = Amount(running_balance.running_balance, reporting_commodity())
# Subtract balances at start_date from balances at end_date if required
if start_date is not None:
for k in result.accounts.keys():
# If k not in result_start_date, then the balance at start_date was necessarily 0 and subtraction is not required
if k in result_start_date.accounts:
result.accounts[k].quantity -= result_start_date.accounts[k].quantity
return result
def apply_transactions(self, transactions):
for transaction in transactions:
for posting in transaction.postings:
if posting.account not in self.accounts:
self.accounts[posting.account] = Amount(0, reporting_commodity())
# FIXME: Handle commodities better (ensure compatible commodities)
self.accounts[posting.account].quantity += posting.amount().as_cost().quantity
def transfer_balance(self, source_account, destination_account, description=None):
"""Transfer the balance of the source account to the destination account"""
# TODO: Keep a record of internal transactions?
if source_account == destination_account:
# Don't do anything in this case!!
return
if source_account not in self.accounts:
return
if destination_account not in self.accounts:
self.accounts[destination_account] = Amount(0, reporting_commodity())
# FIXME: Handle commodities
self.accounts[destination_account].quantity += self.accounts[source_account].quantity
del self.accounts[source_account]
class AccountConfiguration(db.Model):
__tablename__ = 'account_configurations'
id = db.Column(db.Integer, primary_key=True)
account = db.Column(db.String)
kind = db.Column(db.String)
data = db.Column(db.JSON)
@staticmethod
def get_all():
account_configurations = db.session.execute(db.select(AccountConfiguration).order_by(AccountConfiguration.account)).scalars()
account_configurations = {v: list(g) for v, g in groupby(account_configurations, lambda c: c.account)}
return account_configurations
@staticmethod
def get_all_kinds():
account_configurations = AccountConfiguration.get_all()
kinds = {k: [vv.kind for vv in v] for k, v in account_configurations.items()}
return kinds
# ----------------
# Metadata helpers
class Metadata(db.Model):
__tablename__ = 'metadata'
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String)
value = db.Column(db.String)
@staticmethod
def get(key):
return Metadata.query.filter_by(key=key).one().value
@functools.cache # Very poor performance if result is not cached!
def reporting_commodity():
"""Get the native reporting commodity"""
return Metadata.get('reporting_commodity')