Rewrite backend again to process transactions fully ourselves

Reimplemented up to trial balance/account transactions
This commit is contained in:
RunasSudo 2020-03-20 21:43:05 +11:00
parent be82615598
commit 2bc6bb0e22
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
10 changed files with 534 additions and 306 deletions

View File

@ -1,7 +1,7 @@
# Set up how we will call Ledger # Set up how we will call Ledger
ledger_file: /path/to/ledger.journal ledger_file: /path/to/ledger.journal
ledger_args: ['--pedantic', '--recursive-aliases'] ledger_args: ['--pedantic', '--recursive-aliases']
report_currency: $ report_currency: ['$', True] # True if prefix, False if suffix
# Tell ledger-pyreport about the top-level account categories # Tell ledger-pyreport about the top-level account categories
assets_account: Assets assets_account: Assets

View File

@ -16,6 +16,8 @@
from . import accounting from . import accounting
from . import ledger from . import ledger
from .config import config
from .model import *
from datetime import datetime, timedelta from datetime import datetime, timedelta
from decimal import Decimal from decimal import Decimal
@ -39,36 +41,45 @@ def trial():
if compare == 0: if compare == 0:
# Get trial balance # Get trial balance
trial_balance = ledger.trial_balance(date, pstart) l = ledger.raw_transactions_at_date(date)
trial_balance = accounting.trial_balance(l, date, pstart)
total_dr = Decimal(0) report_currency = Currency(*config['report_currency'])
total_cr = Decimal(0) trial_balance = accounting.add_unrealized_gains(trial_balance, report_currency)
for account in trial_balance.accounts.values(): total_dr = Amount(0, report_currency)
balance = trial_balance.get_balance(account.name) total_cr = Amount(0, report_currency)
for account in l.accounts.values():
# Display in "cost basis" as we have already accounted for unrealised gains
balance = trial_balance.get_balance(account).exchange(report_currency, True)
if balance > 0: if balance > 0:
total_dr += balance total_dr += balance
else: else:
total_cr -= balance total_cr -= balance
return flask.render_template('trial.html', trial_balance=trial_balance, total_dr=total_dr, total_cr=total_cr) return flask.render_template('trial.html', date=date, pstart=pstart, trial_balance=trial_balance, accounts=sorted(l.accounts.values(), key=lambda a: a.name), total_dr=total_dr, total_cr=total_cr, report_currency=report_currency)
else: else:
# Get multiple trial balances for comparison # Get multiple trial balances for comparison
dates = [date.replace(year=date.year - i) for i in range(0, compare + 1)] dates = [date.replace(year=date.year - i) for i in range(0, compare + 1)]
pstarts = [pstart.replace(year=pstart.year - i) for i in range(0, compare + 1)] pstarts = [pstart.replace(year=pstart.year - i) for i in range(0, compare + 1)]
trial_balances = [ledger.trial_balance(d, p) for d, p in zip(dates, pstarts)] report_currency = Currency(*config['report_currency'])
l = ledger.raw_transactions_at_date(date)
trial_balances = [accounting.add_unrealized_gains(accounting.trial_balance(l, d, p), report_currency) for d, p in zip(dates, pstarts)]
# Delete accounts with always zero balances # Delete accounts with always zero balances
accounts = list(trial_balances[0].accounts.values()) accounts = list(trial_balances[0].ledger.accounts.values())
for account in accounts[:]: for account in accounts[:]:
if all(t.get_balance(account.name) == 0 for t in trial_balances): if all(t.get_balance(account) == 0 for t in trial_balances):
accounts.remove(account) accounts.remove(account)
return flask.render_template('trial_multiple.html', trial_balances=trial_balances, accounts=accounts) return flask.render_template('trial_multiple.html', trial_balances=trial_balances, accounts=sorted(accounts, key=lambda a: a.name), report_currency=report_currency)
@app.route('/balance') @app.route('/balance')
def balance(): def balance():
raise Exception('NYI')
date = datetime.strptime(flask.request.args['date'], '%Y-%m-%d') date = datetime.strptime(flask.request.args['date'], '%Y-%m-%d')
pstart = datetime.strptime(flask.request.args['pstart'], '%Y-%m-%d') pstart = datetime.strptime(flask.request.args['pstart'], '%Y-%m-%d')
compare = int(flask.request.args['compare']) compare = int(flask.request.args['compare'])
@ -89,6 +100,8 @@ def balance():
@app.route('/pandl') @app.route('/pandl')
def pandl(): def pandl():
raise Exception('NYI')
date_beg = datetime.strptime(flask.request.args['date_beg'], '%Y-%m-%d') date_beg = datetime.strptime(flask.request.args['date_beg'], '%Y-%m-%d')
date_end = datetime.strptime(flask.request.args['date_end'], '%Y-%m-%d') date_end = datetime.strptime(flask.request.args['date_end'], '%Y-%m-%d')
compare = int(flask.request.args['compare']) compare = int(flask.request.args['compare'])
@ -118,36 +131,45 @@ def pandl():
def transactions(): def transactions():
date = datetime.strptime(flask.request.args['date'], '%Y-%m-%d') date = datetime.strptime(flask.request.args['date'], '%Y-%m-%d')
pstart = datetime.strptime(flask.request.args['pstart'], '%Y-%m-%d') pstart = datetime.strptime(flask.request.args['pstart'], '%Y-%m-%d')
account = flask.request.args.get('account', None)
trial_balance_pstart = ledger.trial_balance(pstart, pstart) # General ledger
account = trial_balance_pstart.get_account(flask.request.args['account']) l = ledger.raw_transactions_at_date(date)
opening_balance = trial_balance_pstart.get_balance(account.name)
balance = opening_balance # Unrealized gains
transactions = account.get_transactions(date, pstart) report_currency = Currency(*config['report_currency'])
for transaction in transactions: l = accounting.add_unrealized_gains(accounting.trial_balance(l, date, pstart), report_currency).ledger
for posting in transaction.postings[:]:
if posting.account == account.name:
transaction.postings.remove(posting)
else:
posting.amount = -posting.amount # In terms of effect on this account
balance += posting.amount
posting.balance = balance
trial_balance = ledger.trial_balance(date, pstart) if not account:
closing_balance = trial_balance.get_balance(account.name) transactions = [t for t in l.transactions if t.date <= date and t.date >= pstart]
return flask.render_template('transactions.html', date=date, pstart=pstart, account=account, transactions=transactions, opening_balance=opening_balance, closing_balance=closing_balance) total_dr = sum((p.amount for t in transactions for p in t.postings if p.amount > 0), Balance()).exchange(report_currency, True)
total_cr = sum((p.amount for t in transactions for p in t.postings if p.amount < 0), Balance()).exchange(report_currency, True)
return flask.render_template('transactions.html', date=date, pstart=pstart, account=None, ledger=l, transactions=transactions, total_dr=total_dr, total_cr=total_cr, report_currency=report_currency)
else:
account = l.get_account(account)
transactions = [t for t in l.transactions if t.date <= date and t.date >= pstart and any(p.account == account for p in t.postings)]
opening_balance = accounting.trial_balance(l, pstart, pstart).get_balance(account).exchange(report_currency, True)
closing_balance = accounting.trial_balance(l, date, pstart).get_balance(account).exchange(report_currency, True)
return flask.render_template('transactions.html', date=date, pstart=pstart, account=account, ledger=l, transactions=transactions, opening_balance=opening_balance, closing_balance=closing_balance, report_currency=report_currency)
@app.template_filter('a') @app.template_filter('a')
def filter_amount(amt): def filter_amount(amt):
if amt < 0.005 and amt >= -0.005: if amt.amount < 0.005 and amt.amount >= -0.005:
return flask.Markup('0.00&nbsp;') return flask.Markup('0.00&nbsp;')
elif amt > 0: elif amt > 0:
return flask.Markup('{:,.2f}&nbsp;'.format(amt).replace(',', '&#8239;')) # Narrow no-break space return flask.Markup('{:,.2f}&nbsp;'.format(amt.amount).replace(',', '&#8239;')) # Narrow no-break space
else: else:
return flask.Markup('({:,.2f})'.format(-amt).replace(',', '&#8239;')) return flask.Markup('({:,.2f})'.format(-amt.amount).replace(',', '&#8239;'))
@app.template_filter('b') @app.template_filter('b')
def filter_amount_positive(amt): def filter_amount_positive(amt):
return flask.Markup('{:,.2f}'.format(amt).replace(',', '&#8239;')) return flask.Markup('{:,.2f}'.format(amt.amount).replace(',', '&#8239;'))
#return flask.Markup('{:,.2f} {}'.format(amt.amount, amt.currency.name).replace(',', '&#8239;'))
@app.template_filter('bb')
def filter_balance_positive(balance):
return flask.Markup('<br>'.join(filter_amount_positive(a) for a in balance.amounts))

View File

@ -17,18 +17,36 @@
import csv import csv
from decimal import Decimal from decimal import Decimal
from . import ledger from .model import *
# Generate balance sheet def add_unrealized_gains(tb, currency):
for account in list(tb.ledger.accounts.values()):
if not account.is_market:
continue
def balance_sheet(date, pstart): total_cost = tb.get_balance(account).exchange(currency, True)
# Get trial balance total_market = tb.get_balance(account).exchange(currency, False, tb.date, tb.ledger)
trial_balance = ledger.trial_balance(date, pstart) unrealized_gain = total_market - total_cost
# Calculate Profit/Loss if unrealized_gain != 0:
total_pandl = trial_balance.get_total(ledger.config['income_account']) + trial_balance.get_total(ledger.config['expenses_account']) transaction = Transaction(tb.ledger, None, tb.date, '<Unrealized Gains>')
transaction.postings.append(Posting(transaction, account, unrealized_gain))
transaction.postings.append(Posting(transaction, tb.ledger.get_account(config['unrealized_gains']), -unrealized_gain))
tb.ledger.transactions.append(transaction)
# Add Current Year Earnings account return trial_balance(tb.ledger, tb.date, tb.pstart)
trial_balance.set_balance(ledger.config['current_year_earnings'], trial_balance.get_balance(ledger.config['current_year_earnings']) + total_pandl)
return trial_balance def trial_balance(ledger, date, pstart):
tb = TrialBalance(ledger, date, pstart)
for transaction in ledger.transactions:
if transaction.date > date:
continue
for posting in transaction.postings:
if (posting.account.is_income or posting.account.is_expense) and transaction.date < pstart:
tb.balances[config['retained_earnings']] = tb.get_balance(ledger.get_account(config['retained_earnings'])) + posting.amount
else:
tb.balances[posting.account.name] = tb.get_balance(posting.account) + posting.amount
return tb

20
ledger_pyreport/config.py Normal file
View File

@ -0,0 +1,20 @@
# ledger-pyreport
# Copyright © 2020 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import yaml
with open('config.yml', 'r') as f:
config = yaml.safe_load(f)

View File

@ -47,6 +47,13 @@
<label>Compare <input name="compare" value="0" style="width: 2em;" oninput="txtc(this)"> periods</label> <label>Compare <input name="compare" value="0" style="width: 2em;" oninput="txtc(this)"> periods</label>
{#<label><input name="cash" type="checkbox" oninput="chbc(this)"> Cash basis</label>#} {#<label><input name="cash" type="checkbox" oninput="chbc(this)"> Cash basis</label>#}
</form></li> </form></li>
<li><form action="{{ url_for('transactions') }}">
<button type="submit">General ledger</button>
<label>Date: <input name="date" value="{{ date.strftime('%Y-%m-%d') }}" style="width: 6em;" oninput="txtc(this)"></label>
<label>Period start: <input name="pstart" value="{{ pstart.strftime('%Y-%m-%d') }}" style="width: 6em;" oninput="txtc(this)"></label>
{#<label><input name="cash" type="checkbox" oninput="chbc(this)"> Cash basis</label>#}
</form></li>
</ul> </ul>
<script> <script>

View File

@ -20,13 +20,17 @@
<html> <html>
<head> <head>
<meta charset="utf-8"> <meta charset="utf-8">
<title>Account Transactions for {{ account.name }} as at {{ date.strftime('%d %B %Y') }}</title> <title>{% if account %}Account Transactions{% else %}General Ledger{% endif %} as at {{ date.strftime('%d %B %Y') }}</title>
<link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/normalize/8.0.1/normalize.min.css" integrity="sha256-l85OmPOjvil/SOvVt3HnSSjzF1TUMyT9eV0c2BzEGzU=" crossorigin="anonymous"> <link rel="stylesheet" type="text/css" href="https://cdnjs.cloudflare.com/ajax/libs/normalize/8.0.1/normalize.min.css" integrity="sha256-l85OmPOjvil/SOvVt3HnSSjzF1TUMyT9eV0c2BzEGzU=" crossorigin="anonymous">
<link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='main.css') }}"> <link rel="stylesheet" type="text/css" href="{{ url_for('static', filename='main.css') }}">
</head> </head>
<body> <body>
<h1>Account Transactions</h1> {% if account %}
<h2 style="margin-bottom: 0;">For {{ account.name }}</h2> <h1>Account Transactions</h1>
<h2 style="margin-bottom: 0;">For {{ account.name }}</h2>
{% else %}
<h1>General Ledger</h1>
{% endif %}
<h2>As at {{ date.strftime('%d %B %Y') }}</h2> <h2>As at {{ date.strftime('%d %B %Y') }}</h2>
<table class="ledger"> <table class="ledger">
@ -36,56 +40,81 @@
<th style="max-width: 8em;">Account</th> <th style="max-width: 8em;">Account</th>
<th class="h1" style="text-align: right; width: 5em;">Dr</th> <th class="h1" style="text-align: right; width: 5em;">Dr</th>
<th class="h1" style="text-align: right; width: 5em;">Cr</th> <th class="h1" style="text-align: right; width: 5em;">Cr</th>
<th class="h1" style="text-align: right; width: 6em;">Balance</th> {% if account %}<th class="h1" style="text-align: right; width: 6em;">Balance</th>{% endif %}
</tr>
<tr class="total">
<td>{{ pstart.strftime('%Y-%m-%d') }}</td>
<td>Opening Balance</td>
<td></td>
<td style="text-align: right;"></td>
<td style="text-align: right;"></td>
<td style="text-align: right;">
{% if opening_balance >= 0 %}
{{ opening_balance|b }} Dr
{% else %}
{{ -opening_balance|b }} Cr
{% endif %}
</td>
</tr> </tr>
{% set ns = namespace(balance=None) %}
{% if account %}
<tr class="total">
<td>{{ pstart.strftime('%Y-%m-%d') }}</td>
<td>Opening Balance</td>
<td></td>
<td style="text-align: right;"></td>
<td style="text-align: right;"></td>
<td style="text-align: right;">
{% if opening_balance >= 0 %}
{{ opening_balance|b }} Dr
{% else %}
{{ -opening_balance|b }} Cr
{% endif %}
</td>
</tr>
{% set ns.balance = opening_balance %}
{% endif %}
{% for transaction in transactions %} {% for transaction in transactions %}
{% for posting in transaction.postings if posting.amount >= 0.05 or posting.amount < -0.05 %} {% for posting in transaction.postings if (posting.amount.amount >= 0.05 or posting.amount.amount < -0.05) and posting.account != account %}
{% set amount = posting.exchange(report_currency, transaction.date) %}
{% set ns.balance = ns.balance - amount %}
<tr> <tr>
<td>{% if loop.first %}{{ transaction.date.strftime('%Y-%m-%d') }}{% endif %}</td> <td>{% if loop.first %}{{ transaction.date.strftime('%Y-%m-%d') }}{% endif %}</td>
<td>{% if loop.first %}{{ transaction.payee }}{% endif %}</td> <td>{% if loop.first %}{{ transaction.description }}{% endif %}</td>
<td><a href="/transactions?{{ {'date': date.strftime('%Y-%m-%d'), 'pstart': pstart.strftime('%Y-%m-%d'), 'account': posting.account}|urlencode }}">{{ (posting.account|e).__str__().replace(':', ':<wbr>')|safe }}</a></td> <td><a href="/transactions?{{ {'date': date.strftime('%Y-%m-%d'), 'pstart': pstart.strftime('%Y-%m-%d'), 'account': posting.account.name}|urlencode }}">{{ (posting.account.name|e).__str__().replace(':', ':<wbr>')|safe }}</a></td>
<td style="text-align: right;">{% if posting.amount > 0 %}{{ posting.amount|b }}{% endif %}</td> {% if account %}
<td style="text-align: right;">{% if posting.amount < 0 %}{{ -posting.amount|b }}{% endif %}</td> {# Reverse Dr/Cr so it's from the "perspective" of this account #}
<td style="text-align: right;"> <td style="text-align: right;">{% if amount < 0 %}<span title="{{ (-posting.amount).tostr(False) }}">{{ -amount|b }}</span>{% endif %}</td>
{% if loop.last %} <td style="text-align: right;">{% if amount > 0 %}<span title="{{ posting.amount.tostr(False) }}">{{ amount|b }}</span>{% endif %}</td>
{% if posting.balance >= 0 %} <td style="text-align: right;">
{{ posting.balance|b }} Dr {% if loop.last %}
{% else %} {% if ns.balance >= 0 %}
{{ -posting.balance|b }} Cr {{ ns.balance|b }} Dr
{% else %}
{{ -ns.balance|b }} Cr
{% endif %}
{% endif %} {% endif %}
{% endif %} </td>
</td> {% else %}
<td style="text-align: right;">{% if amount > 0 %}<span title="{{ posting.amount.tostr(False) }}">{{ amount|b }}</span>{% endif %}</td>
<td style="text-align: right;">{% if amount < 0 %}<span title="{{ (-posting.amount).tostr(False) }}">{{ -amount|b }}</span>{% endif %}</td>
{% endif %}
</tr> </tr>
{% endfor %} {% endfor %}
{% endfor %} {% endfor %}
<tr class="total">
<td>{{ date.strftime('%Y-%m-%d') }}</td> {% if account %}
<td>Closing Balance</td> <tr class="total">
<td></td> <td>{{ date.strftime('%Y-%m-%d') }}</td>
<td style="text-align: right;"></td> <td>Closing Balance</td>
<td style="text-align: right;"></td> <td></td>
<td style="text-align: right;"> <td style="text-align: right;"></td>
{% if closing_balance >= 0 %} <td style="text-align: right;"></td>
{{ closing_balance|b }} Dr <td style="text-align: right;">
{% else %} {% if closing_balance >= 0 %}
{{ -closing_balance|b }} Cr {{ closing_balance|b }} Dr
{% endif %} {% else %}
</td> {{ -closing_balance|b }} Cr
</tr> {% endif %}
</td>
</tr>
{% else %}
<tr class="total">
<td>{{ date.strftime('%Y-%m-%d') }}</td>
<td>Total</td>
<td></td>
<td style="text-align: right;">{{ total_dr|b }}</td>
<td style="text-align: right;">{{ -total_cr|b }}</td>
</tr>
{% endif %}
</table> </table>
</body> </body>
</html> </html>

View File

@ -34,8 +34,9 @@
<th class="h1">Dr</th> <th class="h1">Dr</th>
<th class="h1">Cr</th> <th class="h1">Cr</th>
</tr> </tr>
{% for account in trial_balance.accounts.values() %} {% for account in accounts %}
{% set balance = trial_balance.get_balance(account.name) %} {# Display in "cost basis" as we have already accounted for unrealised gains #}
{% set balance = trial_balance.get_balance(account).exchange(report_currency, True) %}
{% set trn_url = "/transactions?" + {'date': trial_balance.date.strftime('%Y-%m-%d'), 'pstart': trial_balance.pstart.strftime('%Y-%m-%d'), 'account': account.name}|urlencode %} {% set trn_url = "/transactions?" + {'date': trial_balance.date.strftime('%Y-%m-%d'), 'pstart': trial_balance.pstart.strftime('%Y-%m-%d'), 'account': account.name}|urlencode %}
{% if balance != 0 %} {% if balance != 0 %}
<tr> <tr>

View File

@ -37,7 +37,7 @@
<tr> <tr>
<td>{{ account.name }}</td> <td>{{ account.name }}</td>
{% for trial_balance in trial_balances %} {% for trial_balance in trial_balances %}
{% set balance = trial_balance.get_balance(account.name) %} {% set balance = trial_balance.get_balance(account).exchange(report_currency, True) %}
<td>{% if balance != 0 %}{{ balance|a }}{% endif %}</td> <td>{% if balance != 0 %}{{ balance|a }}{% endif %}</td>
{% endfor %} {% endfor %}
</tr> </tr>

View File

@ -14,22 +14,19 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from .config import config
from .model import *
import csv import csv
from datetime import datetime, timedelta from datetime import datetime, timedelta
from decimal import Decimal from decimal import Decimal
import re import re
import subprocess import subprocess
import yaml
# Load config
with open('config.yml', 'r') as f:
config = yaml.safe_load(f)
# Helper commands to run Ledger # Helper commands to run Ledger
def run_ledger(*args): def run_ledger(*args):
ledger_args = ['ledger', '--args-only', '--file', config['ledger_file'], '-X', config['report_currency'], '--date-format', '%Y-%m-%d', '--unround'] + config['ledger_args'] + list(args) ledger_args = ['ledger', '--args-only', '--file', config['ledger_file'], '--date-format', '%Y-%m-%d', '--unround'] + config['ledger_args'] + list(args)
#print(' '.join(ledger_args))
proc = subprocess.Popen(ledger_args, encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE) proc = subprocess.Popen(ledger_args, encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate() stdout, stderr = proc.communicate()
@ -51,221 +48,56 @@ def financial_year(date):
# Ledger logic # Ledger logic
class Account:
def __init__(self, name):
if not isinstance(name, str):
raise Exception('Account name must be a string')
self.name = name
self.parent = None
self.children = []
def __repr__(self):
return '<Account "{}">'.format(self.name)
@property
def bits(self):
return self.name.split(':')
def matches(self, needle):
if self.name == needle or self.name.startswith(needle + ':'):
return True
return False
def insert_into_tree(self, accounts):
if ':' in self.name:
parent_name = self.name[:self.name.rindex(':')]
if parent_name not in accounts:
parent = Account(parent_name)
accounts[parent_name] = parent
parent.insert_into_tree(accounts)
self.parent = accounts[parent_name]
if self not in self.parent.children:
self.parent.children.append(self)
@property
def is_asset(self):
return self.matches(config['assets_account'])
@property
def is_liability(self):
return self.matches(config['liabilities_account'])
@property
def is_equity(self):
return self.matches(config['equity_account'])
@property
def is_income(self):
return self.matches(config['income_account'])
@property
def is_expense(self):
return self.matches(config['expenses_account'])
@property
def is_cash(self):
return any(self.matches(a) for a in config['cash_asset_accounts'])
def get_transactions(self, date, pstart):
transactions = []
output = run_ledger_date(date, 'register', '--no-rounding', '--register-format', '%(quoted(format_date(date))),%(quoted(payee)),%(quoted(account)),%(quoted(display_amount))\n', '--limit', 'account=~/^{}$/'.format(re.escape(self.name).replace('/', '\\/')), '--cost' if self.is_income or self.is_expense else '--market', '--related-all', '--no-revalued')
output += run_ledger_date(date, 'register', '--no-rounding', '--register-format', '%(quoted(format_date(date))),%(quoted(payee)),%(quoted(account)),%(quoted(display_amount))\n', '--limit', 'account=~/^{}$/'.format(re.escape(self.name).replace('/', '\\/')), '--cost' if self.is_income or self.is_expense else '--market', '--revalued-only')
reader = csv.reader(output.splitlines())
for row in reader:
t_date = datetime.strptime(row[0], '%Y-%m-%d')
if t_date < pstart:
continue
posting = Posting(row[2], parse_amount(row[3]))
if posting.account == '<Revalued>':
posting.account = self.name
if transactions and t_date == transactions[-1].date and row[1] == transactions[-1].payee:
# Posting for previous transaction
transactions[-1].postings.append(posting)
else:
# New transaction
transactions.append(Transaction(t_date, row[1], [posting]))
transactions.sort(key=lambda t: t.date)
# Balance transactions
for transaction in transactions:
t_total = sum(p.amount for p in transaction.postings)
if t_total != 0:
# Transaction requires balancing, probably due to unrealised gain/revaluation?
transaction.postings.append(Posting(config['unrealized_gains'], -t_total))
return transactions
class Transaction:
def __init__(self, date, payee, postings):
self.date = date
self.payee = payee
self.postings = postings
class Posting:
def __init__(self, account, amount):
self.account = account
self.amount = amount
self.balance = None
class Snapshot:
def __init__(self, date):
self.date = date
self.pstart = None
self.accounts = {}
self.balances = {}
def get_account(self, account_name):
if account_name not in self.accounts:
account = Account(account_name)
self.accounts[account_name] = account
account.insert_into_tree(self.accounts)
return self.accounts[account_name]
def set_balance(self, account_name, balance):
if account_name not in self.accounts:
account = Account(account_name)
self.accounts[account_name] = account
account.insert_into_tree(self.accounts)
if account_name not in self.balances:
self.balances[account_name] = Decimal(0)
self.balances[account_name] = balance
def get_balance(self, account_name):
if account_name not in self.accounts:
self.set_balance(account_name, Decimal(0))
if account_name not in self.balances:
self.balances[account_name] = Decimal(0)
return self.balances[account_name]
def get_total(self, account_name):
return self.get_balance(account_name) + sum(self.get_total(c.name) for c in self.accounts[account_name].children)
def parse_amount(amount): def parse_amount(amount):
if amount == '' or amount == '0': if '{' in amount:
return Decimal(0) amount_str = amount[:amount.index('{')].strip()
if not amount.startswith(config['report_currency']): price_str = amount[amount.index('{')+1:amount.index('}')].strip()
raise Exception('Unexpected currency returned by ledger: {}'.format(amount)) else:
return Decimal(amount[len(config['report_currency']):]) amount_str = amount
price_str = None
def get_accounts(): if amount_str[0] in list('0123456789-'):
output = run_ledger('balance', '--balance-format', '%(account)\n', '--no-total', '--flat', '--empty') # Currency follows number
account_names = output.rstrip('\n').split('\n') bits = amount_str.split()
amount_num = Decimal(bits[0])
currency = Currency(bits[1], False)
else:
# Currency precedes number
currency = Currency(amount_str[0], True)
amount_num = Decimal(amount_str[1:])
accounts = {n: Account(n) for n in account_names} if price_str:
currency.price = parse_amount(price_str)
for account in list(accounts.values()): return Amount(amount_num, currency)
account.insert_into_tree(accounts)
return accounts def get_pricedb():
output = run_ledger('prices', '--prices-format', '%(quoted(format_date(date))),%(quoted(display_account)),%(quoted(display_amount))\n')
# Raw Ledger output, unlikely to balance prices = []
def get_raw_snapshot(date, basis=None):
snapshot = Snapshot(date)
# Get balances from Ledger
output = (
run_ledger_date(date, 'balance', '--balance-format', '%(quoted(account)),%(quoted(display_total))\n', '--no-total', '--flat', '--empty', basis if basis is not None else '--market', config['assets_account'], config['liabilities_account'], config['equity_account']) +
run_ledger_date(date, 'balance', '--balance-format', '%(quoted(account)),%(quoted(display_total))\n', '--no-total', '--flat', '--empty', basis if basis is not None else '--cost', config['income_account'], config['expenses_account'])
)
reader = csv.reader(output.splitlines()) reader = csv.reader(output.splitlines())
for row in reader: for date_str, currency, price_str in reader:
snapshot.set_balance(row[0], parse_amount(row[1])) prices.append((datetime.strptime(date_str, '%Y-%m-%d'), currency, parse_amount(price_str)))
return snapshot return prices
# Ledger output, adjusted for Unrealized Gains def raw_transactions_at_date(date):
def get_snapshot(date): ledger = Ledger(date)
snapshot_cost = get_raw_snapshot(date, '--cost') ledger.prices = get_pricedb()
snapshot = get_raw_snapshot(date)
market_total = Decimal(0) output = run_ledger_date(date, 'csv', '--csv-format', '%(quoted(parent.id)),%(quoted(format_date(date))),%(quoted(payee)),%(quoted(account)),%(quoted(display_amount))\n')
cost_total = Decimal(0)
# Calculate unrealized gains reader = csv.reader(output.splitlines())
for account in snapshot.accounts.values(): for trn_id, date_str, payee, account_str, amount_str in reader:
if account.is_asset or account.is_liability: if not ledger.transactions or trn_id != ledger.transactions[-1].id:
market_total += snapshot.get_balance(account.name) transaction = Transaction(ledger, trn_id, datetime.strptime(date_str, '%Y-%m-%d'), payee)
cost_total += snapshot_cost.get_balance(account.name) ledger.transactions.append(transaction)
else:
# Transaction ID matches: continuation of previous transaction
transaction = ledger.transactions[-1]
# Add Unrealized Gains account posting = Posting(transaction, ledger.get_account(account_str), parse_amount(amount_str))
unrealized_gains_amt = market_total - cost_total transaction.postings.append(posting)
snapshot.set_balance(config['unrealized_gains'], snapshot.get_balance(config['unrealized_gains']) - unrealized_gains_amt)
return snapshot return ledger
# Ledger output, simulating closing of books
def trial_balance(date, pstart):
# Get balances at period start
snapshot_pstart = get_snapshot(pstart - timedelta(days=1))
# Get balances at date
snapshot = get_snapshot(date)
snapshot.pstart = pstart
# Calculate Retained Earnings, and adjust income/expense accounts
total_pandl = Decimal(0)
for account in snapshot_pstart.accounts.values():
if account.is_income or account.is_expense:
total_pandl += snapshot_pstart.get_balance(account.name)
# Add Retained Earnings account
snapshot.set_balance(config['retained_earnings'], snapshot.get_balance(config['retained_earnings']) + total_pandl)
# Adjust income/expense accounts
for account in snapshot.accounts.values():
if account.is_income or account.is_expense:
snapshot.set_balance(account.name, snapshot.get_balance(account.name) - snapshot_pstart.get_balance(account.name))
return snapshot

299
ledger_pyreport/model.py Normal file
View File

@ -0,0 +1,299 @@
# ledger-pyreport
# Copyright © 2020 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from .config import config
from decimal import Decimal
import functools
class Ledger:
def __init__(self, date):
self.date = date
self.root_account = Account(self, '')
self.accounts = {}
self.transactions = []
self.prices = []
def get_account(self, name):
if name == '':
return self.root_account
if name in self.accounts:
return self.accounts[name]
account = Account(self, name)
if account.parent:
account.parent.children.append(account)
self.accounts[name] = account
return account
def get_price(self, currency_from, currency_to, date):
prices = [p for p in self.prices if p[1] == currency_from.name and p[2].currency == currency_to and p[0].date() <= date.date()]
if not prices:
raise Exception('No price information for {} to {} at {:%Y-%m-%d}'.format(currency_from, currency_to, date))
return max(prices, key=lambda p: p[0])[2]
class Transaction:
def __init__(self, ledger, id, date, description):
self.ledger = ledger
self.id = id
self.date = date
self.description = description
self.postings = []
def __repr__(self):
return '<Transaction {} "{}">'.format(self.id, self.description)
def describe(self):
result = ['{:%Y-%m-%d} {}'.format(self.date, self.description)]
for posting in self.postings:
result.append(' {} {}'.format(posting.account.name, posting.amount.tostr(False)))
return '\n'.join(result)
class Posting:
def __init__(self, transaction, account, amount):
self.transaction = transaction
self.account = account
self.amount = Amount(amount)
def __repr__(self):
return '<Posting "{}" {}>'.format(self.account.name, self.amount.tostr(False))
def exchange(self, currency, date):
if self.amount.currency.name == currency.name and self.amount.currency.is_prefix == currency.is_prefix:
return Amount(self.amount)
return self.amount.exchange(currency, True) # Cost basis
class Account:
def __init__(self, ledger, name):
self.ledger = ledger
self.name = name
self.children = []
def __repr__(self):
return '<Account {}>'.format(self.name)
@property
def bits(self):
return self.name.split(':')
@property
def parent(self):
if self.name == '':
return None
return self.ledger.get_account(':'.join(self.bits[:-1]))
def matches(self, part):
if self.name == part or self.name.startswith(part + ':'):
return True
return False
@property
def is_income(self):
return self.matches(config['income_account'])
@property
def is_expense(self):
return self.matches(config['expenses_account'])
@property
def is_equity(self):
return self.matches(config['equity_account'])
@property
def is_asset(self):
return self.matches(config['assets_account'])
@property
def is_liability(self):
return self.matches(config['liabilities_account'])
@property
def is_cost(self):
return self.is_income or self.is_expense or self.is_equity
@property
def is_market(self):
return self.is_asset or self.is_liability
class Amount:
def __init__(self, amount, currency=None):
if isinstance(amount, Amount):
self.amount = amount.amount
self.currency = amount.currency
elif currency is None:
raise TypeError('currency is required')
else:
self.amount = Decimal(amount)
self.currency = currency
def tostr(self, round=True):
if self.currency.is_prefix:
amount_str = ('{}{:.2f}' if round else '{}{}').format(self.currency.name, self.amount)
else:
amount_str = ('{:.2f} {}' if round else '{:.2f} {}').format(self.amount, self.currency.name)
if self.currency.price:
return '{} {{{}}}'.format(amount_str, self.currency.price)
return amount_str
def __repr__(self):
return '<Amount {}>'.format(self.tostr(False))
def __str__(self):
return self.tostr()
def compatible_currency(func):
@functools.wraps(func)
def wrapped(self, other):
if isinstance(other, Amount):
if other.currency != self.currency:
raise TypeError('Cannot combine Amounts of currency {} and {}'.format(self.currency.name, other.currency.name))
other = other.amount
elif other != 0:
raise TypeError('Cannot combine Amount with non-zero number')
return func(self, other)
return wrapped
def __neg__(self):
return Amount(-self.amount, self.currency)
@compatible_currency
def __eq__(self, other):
return self.amount == other
@compatible_currency
def __ne__(self, other):
return self.amount != other
@compatible_currency
def __gt__(self, other):
return self.amount > other
@compatible_currency
def __ge__(self, other):
return self.amount >= other
@compatible_currency
def __lt__(self, other):
return self.amount < other
@compatible_currency
def __le__(self, other):
return self.amount <= other
@compatible_currency
def __add__(self, other):
return Amount(self.amount + other, self.currency)
@compatible_currency
def __radd__(self, other):
return Amount(other + self.amount, self.currency)
@compatible_currency
def __sub__(self, other):
return Amount(self.amount - other, self.currency)
@compatible_currency
def __rsub__(self, other):
return Amount(other - self.amount, self.currency)
def exchange(self, currency, is_cost, price=None):
if self.currency.name == currency.name and self.currency.is_prefix == currency.is_prefix:
return Amount(self)
if is_cost and self.currency.price and self.currency.price.currency.name == currency.name and self.currency.price.currency.is_prefix == currency.is_prefix:
return Amount(self.amount * self.currency.price.amount, currency)
if price:
return Amount(self.amount * price.amount, currency)
raise TypeError('Cannot exchange {} to {}'.format(self.currency, currency))
class Balance:
def __init__(self, amounts=None):
self.amounts = amounts or []
def tidy(self):
new_amounts = []
for amount in self.amounts:
new_amount = next((a for a in new_amounts if a.currency == amount.currency), None)
return Balance(new_amounts)
def exchange(self, currency, is_cost, date=None, ledger=None):
result = Amount(0, currency)
for amount in self.amounts:
if is_cost or amount.currency.name == currency.name and amount.currency.is_prefix == amount.currency.is_prefix:
result += amount.exchange(currency, is_cost)
else:
result += amount.exchange(currency, is_cost, ledger.get_price(amount.currency, currency, date))
return result
def __neg__(self):
return Balance([-a for a in self.amounts])
def __eq__(self, other):
if isinstance(other, Balance):
raise Exception('NYI')
elif isinstance(other, Amount):
raise Exception('NYI')
elif other == 0:
return len(self.amounts) == 0
else:
raise TypeError('Cannot compare Balance with non-zero number')
def __add__(self, other):
new_amounts = [Amount(a) for a in self.amounts]
if isinstance(other, Balance):
for amount in other.amounts:
new_amount = next((a for a in new_amounts if a.currency == amount.currency), None)
if new_amount is None:
new_amount = Amount(0, amount.currency)
new_amounts.append(new_amount)
new_amount.amount += amount.amount
elif isinstance(other, Amount):
new_amount = next((a for a in new_amounts if a.currency == other.currency), None)
if new_amount is None:
new_amount = Amount(0, other.currency)
new_amounts.append(new_amount)
new_amount.amount += other.amount
else:
raise Exception('NYI')
return Balance(new_amounts)
class Currency:
def __init__(self, name, is_prefix, price=None):
self.name = name
self.is_prefix = is_prefix
self.price = price
def __repr__(self):
return '<Currency {} ({})>'.format(self.name, 'prefix' if self.is_prefix else 'suffix')
def __eq__(self, other):
if not isinstance(other, Currency):
return False
return self.name == other.name and self.is_prefix == other.is_prefix and self.price == other.price
class TrialBalance:
def __init__(self, ledger, date, pstart):
self.ledger = ledger
self.date = date
self.pstart = pstart
self.balances = {}
def get_balance(self, account):
return self.balances.get(account.name, Balance())
def get_total(self, account):
return self.get_balance(account) + sum(self.get_total(a) for a in account.children)