270 lines
8.6 KiB
Python
270 lines
8.6 KiB
Python
# DrCr: Web-based double-entry bookkeeping framework
|
|
# Copyright (C) 2022–2024 Lee Yingtong Li (RunasSudo)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
from markupsafe import Markup
|
|
|
|
from . import AMOUNT_DPS
|
|
from .database import db
|
|
|
|
import functools
|
|
from itertools import groupby
|
|
|
|
class Transaction(db.Model):
|
|
__tablename__ = 'transactions'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
|
|
dt = db.Column(db.DateTime)
|
|
description = db.Column(db.String)
|
|
|
|
postings = db.relationship('Posting', back_populates='transaction', cascade='all, delete-orphan')
|
|
|
|
def __init__(self, dt=None, description=None, postings=None):
|
|
self.dt = dt
|
|
self.description = description
|
|
self.postings = postings or []
|
|
|
|
def assert_valid(self):
|
|
"""Assert that debits equal credits"""
|
|
|
|
total_dr = 0
|
|
total_cr = 0
|
|
|
|
for posting in self.postings:
|
|
amount_cost = posting.amount().as_cost().quantity
|
|
if amount_cost > 0:
|
|
total_dr += amount_cost
|
|
elif amount_cost < 0:
|
|
total_cr -= amount_cost
|
|
|
|
if total_dr != total_cr:
|
|
raise AssertionError('Transaction debits ({}) and credits ({}) do not balance'.format(total_dr, total_cr))
|
|
|
|
class Posting(db.Model):
|
|
__tablename__ = 'postings'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
transaction_id = db.Column(db.Integer, db.ForeignKey('transactions.id'))
|
|
|
|
description = db.Column(db.String)
|
|
account = db.Column(db.String)
|
|
quantity = db.Column(db.Integer)
|
|
commodity = db.Column(db.String)
|
|
|
|
transaction = db.relationship('Transaction', back_populates='postings')
|
|
|
|
def __init__(self, description=None, account=None, quantity=None, commodity=None):
|
|
self.description = description
|
|
self.account = account
|
|
self.quantity = quantity
|
|
self.commodity = commodity
|
|
|
|
def amount(self):
|
|
return Amount(self.quantity, self.commodity)
|
|
|
|
class Amount:
|
|
__slots__ = ['quantity', 'commodity']
|
|
|
|
def __init__(self, quantity, commodity):
|
|
self.quantity = quantity
|
|
self.commodity = commodity
|
|
|
|
@classmethod
|
|
def parse(self, amount_str):
|
|
if ' ' not in amount_str:
|
|
# Default commodity
|
|
quantity = round(float(amount_str) * (10**AMOUNT_DPS))
|
|
return Amount(quantity, reporting_commodity())
|
|
|
|
quantity_str = amount_str[:amount_str.index(' ')]
|
|
quantity = round(float(quantity_str) * (10**AMOUNT_DPS))
|
|
|
|
commodity = amount_str[amount_str.index(' ')+1:]
|
|
return Amount(quantity, commodity)
|
|
|
|
def __repr__(self):
|
|
return '<{}: {}>'.format(self.__class__.__name__, self.format('force'))
|
|
|
|
def __abs__(self):
|
|
return Amount(abs(self.quantity), self.commodity)
|
|
|
|
def __neg__(self):
|
|
return Amount(-self.quantity, self.commodity)
|
|
|
|
def __add__(self, other):
|
|
if self.commodity != other.commodity:
|
|
raise ValueError('Cannot add incompatible commodities {} and {}'.format(self.commodity, other.commodity))
|
|
return Amount(self.quantity + other.quantity, self.commodity)
|
|
|
|
def __sub__(self, other):
|
|
return self + (-other)
|
|
|
|
def clone(self):
|
|
return Amount(self.quantity, self.commodity)
|
|
|
|
def format(self, commodity='non_reporting'):
|
|
if commodity not in ('non_reporting', 'force', 'hide'):
|
|
raise ValueError('Invalid commodity reporting option')
|
|
|
|
if (self.commodity == reporting_commodity() and commodity in ('non_reporting', 'force')) or commodity == 'hide':
|
|
return Markup('{:,.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' '))
|
|
elif len(self.commodity) == 1:
|
|
return Markup('{0}{1:,.{dps}f}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' '))
|
|
else:
|
|
return Markup('{1:,.{dps}f} {0}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' '))
|
|
|
|
def format_accounting(self, link=None):
|
|
if self.quantity >= 0:
|
|
text = '{:,.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' ')
|
|
space = ' '
|
|
else:
|
|
text = '({:,.{dps}f})'.format(-self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS).replace(',', ' ')
|
|
space = ''
|
|
|
|
if link is None:
|
|
return Markup(text + space)
|
|
else:
|
|
return Markup('<a href="{}" class="hover:text-blue-700 hover:underline">{}</a>{}'.format(link, text, space))
|
|
|
|
def quantity_string(self):
|
|
if self.commodity == reporting_commodity():
|
|
return '{:.{dps}f}'.format(self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS)
|
|
elif len(self.commodity) == 1:
|
|
return '{0}{1:.{dps}f}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS)
|
|
else:
|
|
return '{1:.{dps}f} {0}'.format(self.commodity, self.quantity / (10**AMOUNT_DPS), dps=AMOUNT_DPS)
|
|
|
|
def as_cost(self):
|
|
"""Convert commodity to reporting currency in cost basis"""
|
|
|
|
if self.commodity == reporting_commodity():
|
|
return self
|
|
|
|
# TODO: Refactor this
|
|
|
|
if '{{' in self.commodity:
|
|
cost = float(self.commodity[self.commodity.index('{{')+2:self.commodity.index('}}')])
|
|
if self.quantity < 0:
|
|
cost = -cost
|
|
return Amount(round(cost * (10**AMOUNT_DPS)), reporting_commodity())
|
|
elif '{' in self.commodity:
|
|
cost = float(self.commodity[self.commodity.index('{')+1:self.commodity.index('}')])
|
|
return Amount(round(cost * self.quantity), reporting_commodity()) # FIXME: Custom reporting currency
|
|
else:
|
|
raise Exception('No cost base for commodity {}'.format(self.commodity))
|
|
|
|
class Balance:
|
|
"""A collection of Amount's"""
|
|
|
|
def __init__(self):
|
|
self.amounts = []
|
|
|
|
def clone(self):
|
|
balance = Balance()
|
|
balance.amounts = [a.clone() for a in self.amounts]
|
|
return balance
|
|
|
|
def add(self, rhs):
|
|
amount = next((a for a in self.amounts if a.commodity == rhs.commodity), None)
|
|
if amount is None:
|
|
self.amounts.append(rhs)
|
|
else:
|
|
amount.quantity += rhs.quantity
|
|
|
|
def clean(self):
|
|
"""Remove zero amounts"""
|
|
self.amounts = [a for a in self.amounts if a.quantity != 0]
|
|
|
|
class TrialBalancer:
|
|
"""
|
|
Applies transactions to generate a trial balance
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.accounts = {}
|
|
|
|
def apply_transactions(self, transactions):
|
|
for transaction in transactions:
|
|
for posting in transaction.postings:
|
|
if posting.account not in self.accounts:
|
|
self.accounts[posting.account] = Amount(0, reporting_commodity())
|
|
|
|
# FIXME: Handle commodities better
|
|
self.accounts[posting.account].quantity += posting.amount().as_cost().quantity
|
|
|
|
def transfer_balance(self, source_account, destination_account, description=None):
|
|
"""Transfer the balance of the source account to the destination account"""
|
|
|
|
# TODO: Keep a record of internal transactions?
|
|
|
|
if source_account == destination_account:
|
|
# Don't do anything in this case!!
|
|
return
|
|
|
|
if source_account not in self.accounts:
|
|
return
|
|
|
|
if destination_account not in self.accounts:
|
|
self.accounts[destination_account] = Amount(0, reporting_commodity())
|
|
|
|
# FIXME: Handle commodities
|
|
self.accounts[destination_account].quantity += self.accounts[source_account].quantity
|
|
del self.accounts[source_account]
|
|
|
|
class AccountConfiguration(db.Model):
|
|
__tablename__ = 'account_configurations'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
|
|
account = db.Column(db.String)
|
|
kind = db.Column(db.String)
|
|
data = db.Column(db.JSON)
|
|
|
|
@staticmethod
|
|
def get_all():
|
|
account_configurations = db.session.execute(db.select(AccountConfiguration).order_by(AccountConfiguration.account)).scalars()
|
|
account_configurations = {v: list(g) for v, g in groupby(account_configurations, lambda c: c.account)}
|
|
|
|
return account_configurations
|
|
|
|
@staticmethod
|
|
def get_all_kinds():
|
|
account_configurations = AccountConfiguration.get_all()
|
|
kinds = {k: [vv.kind for vv in v] for k, v in account_configurations.items()}
|
|
|
|
return kinds
|
|
|
|
# ----------------
|
|
# Metadata helpers
|
|
|
|
class Metadata(db.Model):
|
|
__tablename__ = 'metadata'
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
|
|
key = db.Column(db.String)
|
|
value = db.Column(db.String)
|
|
|
|
@staticmethod
|
|
def get(key):
|
|
return Metadata.query.filter_by(key=key).one().value
|
|
|
|
@functools.cache # Very poor performance if result is not cached!
|
|
def reporting_commodity():
|
|
"""Get the native reporting commodity"""
|
|
|
|
return Metadata.get('reporting_commodity')
|