# DrCr: Web-based double-entry bookkeeping framework # Copyright (C) 2022–2024 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from markupsafe import Markup from . import AMOUNT_DPS from .database import db import functools from itertools import groupby class Transaction(db.Model): __tablename__ = 'transactions' id = db.Column(db.Integer, primary_key=True) dt = db.Column(db.DateTime) description = db.Column(db.String) postings = db.relationship('Posting', back_populates='transaction', cascade='all, delete-orphan') def __init__(self, dt=None, description=None, postings=None): self.dt = dt self.description = description self.postings = postings or [] def assert_valid(self): """Assert that debits equal credits""" total_dr = 0 total_cr = 0 for posting in self.postings: amount_cost = posting.amount().as_cost().quantity if amount_cost > 0: total_dr += amount_cost elif amount_cost < 0: total_cr -= amount_cost if total_dr != total_cr: raise AssertionError('Transaction debits ({}) and credits ({}) do not balance'.format(total_dr, total_cr)) class Posting(db.Model): __tablename__ = 'postings' id = db.Column(db.Integer, primary_key=True) transaction_id = db.Column(db.Integer, db.ForeignKey('transactions.id')) description = db.Column(db.String) account = db.Column(db.String) quantity = db.Column(db.Integer) commodity = db.Column(db.String) # Running balance of the account in units of reporting_commodity # Only takes into consideration Transactions stored in database, not API-generated ones running_balance = db.Column(db.Integer) transaction = db.relationship('Transaction', back_populates='postings') def __init__(self, description=None, account=None, quantity=None, commodity=None): self.description = description self.account = account self.quantity = quantity self.commodity = commodity def amount(self): return Amount(self.quantity, self.commodity) class Amount: __slots__ = ['quantity', 'commodity'] def __init__(self, quantity, commodity): self.quantity = quantity self.commodity = commodity @classmethod def parse(self, amount_str): if ' ' not in amount_str: # Default commodity quantity = round(float(amount_str) * (10**AMOUNT_DPS)) return Amount(quantity, reporting_commodity()) quantity_str = amount_str[:amount_str.index(' ')] quantity = round(float(quantity_str) * (10**AMOUNT_DPS)) commodity = amount_str[amount_str.index(' ')+1:] return Amount(quantity, commodity) def __repr__(self): return '<{}: {}>'.format(self.__class__.__name__, self.format('force')) def __abs__(self): return Amount(abs(self.quantity), self.commodity) def __neg__(self): return Amount(-self.quantity, self.commodity) def __add__(self, other): if self.commodity != other.commodity: raise ValueError('Cannot add incompatible commodities {} and {}'.format(self.commodity, other.commodity)) return Amount(self.quantity + other.quantity, self.commodity) def __sub__(self, other): return self + (-other) def clone(self): return Amount(self.quantity, self.commodity) def format(self, commodity='non_reporting'): if commodity not in ('non_reporting', 'force', 'hide'): raise ValueError('Invalid commodity reporting option') if (self.commodity == reporting_commodity() and commodity in ('non_reporting', 'force')) or commodity == 'hide': return Markup('{:,.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' ')) elif len(self.commodity) == 1: return Markup('{0}{1:,.{dps}f}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' ')) else: return Markup('{1:,.{dps}f} {0}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' ')) def format_accounting(self, link=None): if self.quantity >= 0: text = '{:,.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' ') space = ' ' else: text = '({:,.{dps}f})'.format(-self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' ') space = '' if link is None: return Markup(text + space) else: return Markup('{}{}'.format(link, text, space)) def quantity_string(self): if self.commodity == reporting_commodity(): return '{:.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS) elif len(self.commodity) == 1: return '{0}{1:.{dps}f}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS) else: return '{1:.{dps}f} {0}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS) def as_cost(self): """Convert commodity to reporting currency in cost basis""" if self.commodity == reporting_commodity(): return self # TODO: Refactor this if '{{' in self.commodity: cost = float(self.commodity[self.commodity.index('{{')+2:self.commodity.index('}}')]) if self.quantity < 0: cost = -cost return Amount(round(cost * (10**AMOUNT_DPS)), reporting_commodity()) elif '{' in self.commodity: cost = float(self.commodity[self.commodity.index('{')+1:self.commodity.index('}')]) return Amount(round(cost * self.quantity), reporting_commodity()) # FIXME: Custom reporting currency else: raise Exception('No cost base for commodity {}'.format(self.commodity)) class Balance: """A collection of Amount's""" def __init__(self): self.amounts = [] def clone(self): balance = Balance() balance.amounts = [a.clone() for a in self.amounts] return balance def add(self, rhs): amount = next((a for a in self.amounts if a.commodity == rhs.commodity), None) if amount is None: self.amounts.append(rhs) else: amount.quantity += rhs.quantity def clean(self): """Remove zero amounts""" self.amounts = [a for a in self.amounts if a.quantity != 0] class TrialBalancer: """ Applies transactions to generate a trial balance """ def __init__(self): self.accounts = {} @classmethod def from_cached(cls, start_date=None, end_date=None): """Obtain a TrialBalancer based on the cached running_balance""" if start_date is not None: result_start_date = cls() # First SELECT the last applicable dt by account # Then, among the transactions with that dt, SELECT the last applicable transaction_id # Then extract the running_balance for each account at that transaction_id running_balances = db.session.execute(''' SELECT p3.account, running_balance FROM ( SELECT p1.account, max(p2.transaction_id) AS max_tid FROM ( SELECT account, max(dt) AS max_dt FROM postings JOIN transactions ON postings.transaction_id = transactions.id WHERE dt < :start_date GROUP BY account ) p1 JOIN postings p2 ON p1.account = p2.account AND p1.max_dt = transactions.dt JOIN transactions ON p2.transaction_id = transactions.id GROUP BY p2.account ) p3 JOIN postings p4 ON p3.account = p4.account AND p3.max_tid = p4.transaction_id ''', {'start_date': start_date}) for running_balance in running_balances.all(): result_start_date.accounts[running_balance.account] = Amount(running_balance.running_balance, reporting_commodity()) if end_date is None: result = cls() running_balances = db.session.execute(''' SELECT p3.account, running_balance FROM ( SELECT p1.account, max(p2.transaction_id) AS max_tid FROM ( SELECT account, max(dt) AS max_dt FROM postings JOIN transactions ON postings.transaction_id = transactions.id GROUP BY account ) p1 JOIN postings p2 ON p1.account = p2.account AND p1.max_dt = transactions.dt JOIN transactions ON p2.transaction_id = transactions.id GROUP BY p2.account ) p3 JOIN postings p4 ON p3.account = p4.account AND p3.max_tid = p4.transaction_id ''') for running_balance in running_balances.all(): result.accounts[running_balance.account] = Amount(running_balance.running_balance, reporting_commodity()) if end_date is not None: result = cls() running_balances = db.session.execute(''' SELECT p3.account, running_balance FROM ( SELECT p1.account, max(p2.transaction_id) AS max_tid FROM ( SELECT account, max(dt) AS max_dt FROM postings JOIN transactions ON postings.transaction_id = transactions.id WHERE dt <= :end_date GROUP BY account ) p1 JOIN postings p2 ON p1.account = p2.account AND p1.max_dt = transactions.dt JOIN transactions ON p2.transaction_id = transactions.id GROUP BY p2.account ) p3 JOIN postings p4 ON p3.account = p4.account AND p3.max_tid = p4.transaction_id ''', {'end_date': end_date}) for running_balance in running_balances.all(): result.accounts[running_balance.account] = Amount(running_balance.running_balance, reporting_commodity()) # Subtract balances at start_date from balances at end_date if required if start_date is not None: for k in result.accounts.keys(): # If k not in result_start_date, then the balance at start_date was necessarily 0 and subtraction is not required if k in result_start_date.accounts: result.accounts[k].quantity -= result_start_date.accounts[k].quantity return result def apply_transactions(self, transactions): for transaction in transactions: for posting in transaction.postings: if posting.account not in self.accounts: self.accounts[posting.account] = Amount(0, reporting_commodity()) # FIXME: Handle commodities better (ensure compatible commodities) self.accounts[posting.account].quantity += posting.amount().as_cost().quantity def transfer_balance(self, source_account, destination_account, description=None): """Transfer the balance of the source account to the destination account""" # TODO: Keep a record of internal transactions? if source_account == destination_account: # Don't do anything in this case!! return if source_account not in self.accounts: return if destination_account not in self.accounts: self.accounts[destination_account] = Amount(0, reporting_commodity()) # FIXME: Handle commodities self.accounts[destination_account].quantity += self.accounts[source_account].quantity del self.accounts[source_account] class AccountConfiguration(db.Model): __tablename__ = 'account_configurations' id = db.Column(db.Integer, primary_key=True) account = db.Column(db.String) kind = db.Column(db.String) data = db.Column(db.JSON) @staticmethod def get_all(): account_configurations = db.session.execute(db.select(AccountConfiguration).order_by(AccountConfiguration.account)).scalars() account_configurations = {v: list(g) for v, g in groupby(account_configurations, lambda c: c.account)} return account_configurations @staticmethod def get_all_kinds(): account_configurations = AccountConfiguration.get_all() kinds = {k: [vv.kind for vv in v] for k, v in account_configurations.items()} return kinds # ---------------- # Metadata helpers class Metadata(db.Model): __tablename__ = 'metadata' id = db.Column(db.Integer, primary_key=True) key = db.Column(db.String) value = db.Column(db.String) @staticmethod def get(key): return Metadata.query.filter_by(key=key).one().value @functools.cache # Very poor performance if result is not cached! def reporting_commodity(): """Get the native reporting commodity""" return Metadata.get('reporting_commodity')