2022-12-24 12:31:45 +11:00
# DrCr: Web-based double-entry bookkeeping framework
2024-11-09 18:14:25 +11:00
# Copyright (C) 2022–2024 Lee Yingtong Li (RunasSudo)
2022-12-24 12:31:45 +11:00
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2022-12-25 17:05:39 +11:00
from markupsafe import Markup
2022-12-24 12:31:45 +11:00
from . import AMOUNT_DPS
2023-01-02 18:08:30 +11:00
from . database import db
2022-12-24 12:31:45 +11:00
2024-11-09 18:14:25 +11:00
import functools
2023-01-04 00:27:44 +11:00
from itertools import groupby
2023-01-02 18:08:30 +11:00
class Transaction ( db . Model ) :
__tablename__ = ' transactions '
id = db . Column ( db . Integer , primary_key = True )
dt = db . Column ( db . DateTime )
description = db . Column ( db . String )
postings = db . relationship ( ' Posting ' , back_populates = ' transaction ' , cascade = ' all, delete-orphan ' )
2022-12-24 12:31:45 +11:00
def __init__ ( self , dt = None , description = None , postings = None ) :
self . dt = dt
self . description = description
self . postings = postings or [ ]
2022-12-24 13:22:18 +11:00
def assert_valid ( self ) :
2022-12-24 20:13:11 +11:00
""" Assert that debits equal credits """
2022-12-24 13:22:18 +11:00
2022-12-24 20:13:11 +11:00
total_dr = 0
total_cr = 0
2022-12-24 13:22:18 +11:00
2022-12-24 20:13:11 +11:00
for posting in self . postings :
amount_cost = posting . amount ( ) . as_cost ( ) . quantity
if amount_cost > 0 :
total_dr + = amount_cost
elif amount_cost < 0 :
total_cr - = amount_cost
if total_dr != total_cr :
raise AssertionError ( ' Transaction debits ( {} ) and credits ( {} ) do not balance ' . format ( total_dr , total_cr ) )
2022-12-24 12:31:45 +11:00
2023-01-02 18:08:30 +11:00
class Posting ( db . Model ) :
__tablename__ = ' postings '
id = db . Column ( db . Integer , primary_key = True )
transaction_id = db . Column ( db . Integer , db . ForeignKey ( ' transactions.id ' ) )
description = db . Column ( db . String )
account = db . Column ( db . String )
quantity = db . Column ( db . Integer )
commodity = db . Column ( db . String )
2024-11-09 22:36:33 +11:00
# Running balance of the account in units of reporting_commodity
# Only takes into consideration Transactions stored in database, not API-generated ones
running_balance = db . Column ( db . Integer )
2023-01-02 18:08:30 +11:00
transaction = db . relationship ( ' Transaction ' , back_populates = ' postings ' )
2024-11-09 23:41:42 +11:00
def __init__ ( self , description = None , account = None , quantity = None , commodity = None , running_balance = None ) :
2022-12-24 12:31:45 +11:00
self . description = description
self . account = account
self . quantity = quantity
self . commodity = commodity
2024-11-09 23:41:42 +11:00
self . running_balance = running_balance
2022-12-24 12:31:45 +11:00
def amount ( self ) :
return Amount ( self . quantity , self . commodity )
2024-11-09 23:41:42 +11:00
def queue_invalidate_running_balances ( account , dt_from ) :
"""
Invalidate running_balances for Postings in the specified account , from the given date onwards
NOTE : Does not call db . session . commit ( )
"""
for posting in db . session . scalars ( db . select ( Posting ) . join ( Posting . transaction ) . where ( ( Transaction . dt > = dt_from ) & ( Posting . account == account ) ) ) . all ( ) :
posting . running_balance = None
2022-12-24 12:31:45 +11:00
class Amount :
__slots__ = [ ' quantity ' , ' commodity ' ]
def __init__ ( self , quantity , commodity ) :
self . quantity = quantity
self . commodity = commodity
2022-12-24 19:45:34 +11:00
@classmethod
def parse ( self , amount_str ) :
if ' ' not in amount_str :
# Default commodity
quantity = round ( float ( amount_str ) * ( 10 * * AMOUNT_DPS ) )
2024-11-09 18:14:25 +11:00
return Amount ( quantity , reporting_commodity ( ) )
2022-12-24 19:45:34 +11:00
quantity_str = amount_str [ : amount_str . index ( ' ' ) ]
quantity = round ( float ( quantity_str ) * ( 10 * * AMOUNT_DPS ) )
commodity = amount_str [ amount_str . index ( ' ' ) + 1 : ]
return Amount ( quantity , commodity )
2023-01-07 02:05:21 +11:00
def __repr__ ( self ) :
return ' < {} : {} > ' . format ( self . __class__ . __name__ , self . format ( ' force ' ) )
2022-12-24 12:31:45 +11:00
def __abs__ ( self ) :
return Amount ( abs ( self . quantity ) , self . commodity )
2022-12-24 19:45:34 +11:00
def __neg__ ( self ) :
return Amount ( - self . quantity , self . commodity )
2023-01-04 00:27:44 +11:00
def __add__ ( self , other ) :
if self . commodity != other . commodity :
raise ValueError ( ' Cannot add incompatible commodities {} and {} ' . format ( self . commodity , other . commodity ) )
return Amount ( self . quantity + other . quantity , self . commodity )
def __sub__ ( self , other ) :
return self + ( - other )
2024-04-04 01:55:52 +11:00
def clone ( self ) :
return Amount ( self . quantity , self . commodity )
2023-01-07 02:05:21 +11:00
def format ( self , commodity = ' non_reporting ' ) :
if commodity not in ( ' non_reporting ' , ' force ' , ' hide ' ) :
raise ValueError ( ' Invalid commodity reporting option ' )
2024-11-09 18:14:25 +11:00
if ( self . commodity == reporting_commodity ( ) and commodity in ( ' non_reporting ' , ' force ' ) ) or commodity == ' hide ' :
2022-12-25 17:05:39 +11:00
return Markup ( ' { :,. {dps} f} ' . format ( self . quantity / ( 10 * * AMOUNT_DPS ) , dps = AMOUNT_DPS ) . replace ( ' , ' , '   ' ) )
elif len ( self . commodity ) == 1 :
return Markup ( ' {0} { 1:,. {dps} f} ' . format ( self . commodity , self . quantity / ( 10 * * AMOUNT_DPS ) , dps = AMOUNT_DPS ) . replace ( ' , ' , '   ' ) )
2022-12-24 19:45:34 +11:00
else :
2022-12-25 17:05:39 +11:00
return Markup ( ' { 1:,. {dps} f} {0} ' . format ( self . commodity , self . quantity / ( 10 * * AMOUNT_DPS ) , dps = AMOUNT_DPS ) . replace ( ' , ' , '   ' ) )
2022-12-24 13:22:18 +11:00
2023-01-07 18:37:09 +11:00
def format_accounting ( self , link = None ) :
2023-01-04 00:27:44 +11:00
if self . quantity > = 0 :
2023-01-07 18:37:09 +11:00
text = ' { :,. {dps} f} ' . format ( self . quantity / ( 10 * * AMOUNT_DPS ) , dps = AMOUNT_DPS ) . replace ( ' , ' , '   ' )
space = ' '
2023-01-04 00:27:44 +11:00
else :
2023-01-07 18:37:09 +11:00
text = ' ( { :,. {dps} f}) ' . format ( - self . quantity / ( 10 * * AMOUNT_DPS ) , dps = AMOUNT_DPS ) . replace ( ' , ' , '   ' )
space = ' '
if link is None :
return Markup ( text + space )
else :
2024-04-04 21:37:58 +11:00
return Markup ( ' <a href= " {} " class= " hover:text-blue-700 hover:underline " > {} </a> {} ' . format ( link , text , space ) )
2023-01-04 00:27:44 +11:00
2022-12-24 13:22:18 +11:00
def quantity_string ( self ) :
2024-11-09 18:14:25 +11:00
if self . commodity == reporting_commodity ( ) :
2022-12-24 20:41:33 +11:00
return ' { :. {dps} f} ' . format ( self . quantity / ( 10 * * AMOUNT_DPS ) , dps = AMOUNT_DPS )
elif len ( self . commodity ) == 1 :
return ' {0} { 1:. {dps} f} ' . format ( self . commodity , self . quantity / ( 10 * * AMOUNT_DPS ) , dps = AMOUNT_DPS )
else :
return ' { 1:. {dps} f} {0} ' . format ( self . commodity , self . quantity / ( 10 * * AMOUNT_DPS ) , dps = AMOUNT_DPS )
2022-12-24 19:45:34 +11:00
def as_cost ( self ) :
""" Convert commodity to reporting currency in cost basis """
2024-11-09 18:14:25 +11:00
if self . commodity == reporting_commodity ( ) :
2022-12-24 19:45:34 +11:00
return self
# TODO: Refactor this
if ' {{ ' in self . commodity :
cost = float ( self . commodity [ self . commodity . index ( ' {{ ' ) + 2 : self . commodity . index ( ' }} ' ) ] )
2022-12-24 20:13:11 +11:00
if self . quantity < 0 :
cost = - cost
2024-11-09 18:14:25 +11:00
return Amount ( round ( cost * ( 10 * * AMOUNT_DPS ) ) , reporting_commodity ( ) )
2022-12-24 19:45:34 +11:00
elif ' { ' in self . commodity :
cost = float ( self . commodity [ self . commodity . index ( ' { ' ) + 1 : self . commodity . index ( ' } ' ) ] )
2024-11-09 18:14:25 +11:00
return Amount ( round ( cost * self . quantity ) , reporting_commodity ( ) ) # FIXME: Custom reporting currency
2022-12-24 19:45:34 +11:00
else :
raise Exception ( ' No cost base for commodity {} ' . format ( self . commodity ) )
2022-12-24 15:14:19 +11:00
2022-12-25 17:31:31 +11:00
class Balance :
""" A collection of Amount ' s """
def __init__ ( self ) :
self . amounts = [ ]
2024-04-04 01:55:52 +11:00
def clone ( self ) :
balance = Balance ( )
balance . amounts = [ a . clone ( ) for a in self . amounts ]
return balance
2022-12-25 17:31:31 +11:00
def add ( self , rhs ) :
amount = next ( ( a for a in self . amounts if a . commodity == rhs . commodity ) , None )
if amount is None :
self . amounts . append ( rhs )
else :
amount . quantity + = rhs . quantity
def clean ( self ) :
""" Remove zero amounts """
self . amounts = [ a for a in self . amounts if a . quantity != 0 ]
2022-12-24 15:14:19 +11:00
class TrialBalancer :
"""
Applies transactions to generate a trial balance
"""
def __init__ ( self ) :
self . accounts = { }
2024-11-09 22:36:33 +11:00
@classmethod
def from_cached ( cls , start_date = None , end_date = None ) :
""" Obtain a TrialBalancer based on the cached running_balance """
2024-11-09 23:41:42 +11:00
# First, recompute any running_balance if required
stale_accounts = db . session . scalars ( ' SELECT DISTINCT account FROM postings WHERE running_balance IS NULL ' ) . all ( )
if stale_accounts :
# Get all relevant Postings in database in correct order
# FIXME: Recompute balances only from the last non-stale balance to be more efficient
postings = db . session . scalars ( db . select ( Posting ) . join ( Posting . transaction ) . where ( Posting . account . in_ ( stale_accounts ) ) . order_by ( Transaction . dt , Transaction . id ) ) . all ( )
accounts = { }
for posting in postings :
if posting . account not in accounts :
accounts [ posting . account ] = Amount ( 0 , reporting_commodity ( ) )
# FIXME: Handle commodities better (ensure compatible commodities)
accounts [ posting . account ] . quantity + = posting . amount ( ) . as_cost ( ) . quantity
posting . running_balance = accounts [ posting . account ] . quantity
db . session . commit ( )
2024-11-09 22:36:33 +11:00
if start_date is not None :
result_start_date = cls ( )
# First SELECT the last applicable dt by account
# Then, among the transactions with that dt, SELECT the last applicable transaction_id
# Then extract the running_balance for each account at that transaction_id
2024-11-10 00:30:33 +11:00
# NB: We need to specify DATE(...) otherwise SQLite appears to do a string comparison which doesn't work properly with SQLAlchemy's prepared statement?
2024-11-09 22:36:33 +11:00
running_balances = db . session . execute ( '''
SELECT p3 . account , running_balance FROM
(
SELECT p1 . account , max ( p2 . transaction_id ) AS max_tid FROM
(
2024-11-10 00:30:33 +11:00
SELECT account , max ( dt ) AS max_dt FROM postings JOIN transactions ON postings . transaction_id = transactions . id WHERE DATE ( dt ) < DATE ( : start_date ) GROUP BY account
2024-11-09 22:36:33 +11:00
) p1
JOIN postings p2 ON p1 . account = p2 . account AND p1 . max_dt = transactions . dt JOIN transactions ON p2 . transaction_id = transactions . id GROUP BY p2 . account
) p3
JOIN postings p4 ON p3 . account = p4 . account AND p3 . max_tid = p4 . transaction_id
''' , { ' start_date ' : start_date})
for running_balance in running_balances . all ( ) :
result_start_date . accounts [ running_balance . account ] = Amount ( running_balance . running_balance , reporting_commodity ( ) )
if end_date is None :
result = cls ( )
running_balances = db . session . execute ( '''
SELECT p3 . account , running_balance FROM
(
SELECT p1 . account , max ( p2 . transaction_id ) AS max_tid FROM
(
SELECT account , max ( dt ) AS max_dt FROM postings JOIN transactions ON postings . transaction_id = transactions . id GROUP BY account
) p1
JOIN postings p2 ON p1 . account = p2 . account AND p1 . max_dt = transactions . dt JOIN transactions ON p2 . transaction_id = transactions . id GROUP BY p2 . account
) p3
JOIN postings p4 ON p3 . account = p4 . account AND p3 . max_tid = p4 . transaction_id
''' )
for running_balance in running_balances . all ( ) :
result . accounts [ running_balance . account ] = Amount ( running_balance . running_balance , reporting_commodity ( ) )
if end_date is not None :
result = cls ( )
running_balances = db . session . execute ( '''
SELECT p3 . account , running_balance FROM
(
SELECT p1 . account , max ( p2 . transaction_id ) AS max_tid FROM
(
2024-11-10 00:30:33 +11:00
SELECT account , max ( dt ) AS max_dt FROM postings JOIN transactions ON postings . transaction_id = transactions . id WHERE DATE ( dt ) < = DATE ( : end_date ) GROUP BY account
2024-11-09 22:36:33 +11:00
) p1
JOIN postings p2 ON p1 . account = p2 . account AND p1 . max_dt = transactions . dt JOIN transactions ON p2 . transaction_id = transactions . id GROUP BY p2 . account
) p3
JOIN postings p4 ON p3 . account = p4 . account AND p3 . max_tid = p4 . transaction_id
''' , { ' end_date ' : end_date})
for running_balance in running_balances . all ( ) :
result . accounts [ running_balance . account ] = Amount ( running_balance . running_balance , reporting_commodity ( ) )
# Subtract balances at start_date from balances at end_date if required
if start_date is not None :
for k in result . accounts . keys ( ) :
# If k not in result_start_date, then the balance at start_date was necessarily 0 and subtraction is not required
if k in result_start_date . accounts :
result . accounts [ k ] . quantity - = result_start_date . accounts [ k ] . quantity
return result
2022-12-24 15:14:19 +11:00
def apply_transactions ( self , transactions ) :
for transaction in transactions :
for posting in transaction . postings :
if posting . account not in self . accounts :
2024-11-09 18:14:25 +11:00
self . accounts [ posting . account ] = Amount ( 0 , reporting_commodity ( ) )
2022-12-24 15:14:19 +11:00
2024-11-09 22:36:33 +11:00
# FIXME: Handle commodities better (ensure compatible commodities)
2022-12-24 19:45:34 +11:00
self . accounts [ posting . account ] . quantity + = posting . amount ( ) . as_cost ( ) . quantity
2022-12-24 15:14:19 +11:00
def transfer_balance ( self , source_account , destination_account , description = None ) :
""" Transfer the balance of the source account to the destination account """
# TODO: Keep a record of internal transactions?
2022-12-25 13:00:34 +11:00
if source_account == destination_account :
# Don't do anything in this case!!
return
2022-12-24 16:57:53 +11:00
if source_account not in self . accounts :
return
2022-12-24 15:14:19 +11:00
if destination_account not in self . accounts :
2024-11-09 18:14:25 +11:00
self . accounts [ destination_account ] = Amount ( 0 , reporting_commodity ( ) )
2022-12-24 15:14:19 +11:00
# FIXME: Handle commodities
self . accounts [ destination_account ] . quantity + = self . accounts [ source_account ] . quantity
del self . accounts [ source_account ]
2023-01-03 18:37:10 +11:00
class AccountConfiguration ( db . Model ) :
__tablename__ = ' account_configurations '
id = db . Column ( db . Integer , primary_key = True )
account = db . Column ( db . String )
kind = db . Column ( db . String )
data = db . Column ( db . JSON )
2023-01-04 00:27:44 +11:00
@staticmethod
def get_all ( ) :
account_configurations = db . session . execute ( db . select ( AccountConfiguration ) . order_by ( AccountConfiguration . account ) ) . scalars ( )
account_configurations = { v : list ( g ) for v , g in groupby ( account_configurations , lambda c : c . account ) }
return account_configurations
@staticmethod
def get_all_kinds ( ) :
account_configurations = AccountConfiguration . get_all ( )
kinds = { k : [ vv . kind for vv in v ] for k , v in account_configurations . items ( ) }
return kinds
2023-07-03 22:31:58 +10:00
2024-11-09 18:14:25 +11:00
# ----------------
# Metadata helpers
2023-07-03 22:31:58 +10:00
class Metadata ( db . Model ) :
__tablename__ = ' metadata '
id = db . Column ( db . Integer , primary_key = True )
key = db . Column ( db . String )
value = db . Column ( db . String )
@staticmethod
def get ( key ) :
return Metadata . query . filter_by ( key = key ) . one ( ) . value
2024-11-09 18:14:25 +11:00
@functools.cache # Very poor performance if result is not cached!
def reporting_commodity ( ) :
""" Get the native reporting commodity """
return Metadata . get ( ' reporting_commodity ' )