This repository has been archived on 2024-11-09. You can view files and clone it, but cannot push or open issues or pull requests.
ledger-pyreport/ledger_pyreport/ledger.py

183 lines
6.4 KiB
Python

# ledger-pyreport
# Copyright © 2020 Lee Yingtong Li (RunasSudo)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import csv
from datetime import timedelta
from decimal import Decimal
import subprocess
import yaml
# Load config
with open('config.yml', 'r') as f:
config = yaml.safe_load(f)
class Account:
def __init__(self, name, balance=None):
if balance is None:
balance = Decimal(0)
self.name = name
self.balance = balance
self.parent = None
self.children = []
@property
def name_parts(self):
return self.name.split(':')
def total(self):
result = self.balance
for child in self.children:
result += child.total()
return result
def run_ledger(*args):
proc = subprocess.Popen(['ledger', '--args-only', '--file', config['ledger_file'], '-X', config['report_currency'], '--unround'] + config['ledger_args'] + list(args), encoding='utf-8', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = proc.communicate()
if stderr:
raise Exception(stderr)
return stdout
def run_ledger_date(date, *args):
return run_ledger('--end', (date + timedelta(days=1)).strftime('%Y-%m-%d'), *args)
BALANCE_FORMAT = '%(quoted(display_total)),%(quoted(account))\n'
def parse_balance(output):
reader = csv.reader(output.splitlines())
accounts = []
# Parse balance lines
for row in reader:
balance = row[0]
if balance.startswith(config['report_currency']):
balance = balance[1:]
accounts.append(Account(row[1], Decimal(balance)))
return accounts
def make_account_tree(accounts):
accounts_map = {}
for account in accounts:
accounts_map[account.name] = account
for i in range(1, len(account.name_parts)):
parent_name = ':'.join(account.name_parts[:i])
if parent_name not in accounts_map:
accounts_map[parent_name] = Account(parent_name, Decimal(0))
for account in accounts_map.values():
if len(account.name_parts) > 1:
account.parent = accounts_map[':'.join(account.name_parts[:-1])]
account.parent.children.append(account)
return accounts_map
# Return a regex for an account and its children
def aregex(account):
return '^{0}:|^{0}$'.format(account)
def amatch(needle, haystack):
if haystack == needle or haystack.startswith(needle + ':'):
return True
return False
def financial_year(date):
pstart = date.replace(day=1, month=7)
if pstart > date:
pstart = pstart.replace(year=pstart.year - 1)
return pstart
# Calculate Unrealized Gains
def unrealized_gains(date):
accounts_cost = parse_balance(run_ledger_date(date, 'balance', '--balance-format', BALANCE_FORMAT, '--no-total', '--flat', '--cost', aregex(config['assets_account']), aregex(config['liabilities_account'])))
accounts_current = parse_balance(run_ledger_date(date, 'balance', '--balance-format', BALANCE_FORMAT, '--no-total', '--flat', '--market', aregex(config['assets_account']), aregex(config['liabilities_account'])))
total_cost = sum(a.balance for a in accounts_cost)
total_current = sum(a.balance for a in accounts_current)
unrealized_gains = total_current - total_cost
return unrealized_gains
# Get account balances at date
def get_accounts(date, cash=False):
# Calculate Unrealized Gains
unrealized_gains_amt = unrealized_gains(date)
# Get account balances
accounts = parse_balance(run_ledger_date(date, 'balance', '--balance-format', BALANCE_FORMAT, '--no-total', '--flat', '--cost', aregex(config['income_account']), aregex(config['expenses_account'])))
accounts += parse_balance(run_ledger_date(date, 'balance', '--balance-format', BALANCE_FORMAT, '--no-total', '--flat', '--market', aregex(config['assets_account']), aregex(config['liabilities_account']), aregex(config['equity_account'])))
# Add Unrealized Gains
accounts.append(Account(config['unrealized_gains'], -unrealized_gains_amt))
accounts.sort(key=lambda a: a.name)
# Convert to cash basis
if cash:
accounts_map = make_account_tree(accounts)
for account in accounts[:]:
if amatch(config['liabilities_account'], account.name) or (amatch(config['assets_account'], account.name) and not any(amatch(x, account.name) for x in config['cash_asset_accounts'])):
drcr = parse_balance(run_ledger_date(date, 'balance', '--related', '--balance-format', BALANCE_FORMAT, '--no-total', '--flat', '--cost' if amatch(config['income_account'], account.name) or amatch(config['expenses_account'], account.name) else '--market', aregex(account.name)))
for drcr_account in drcr:
accounts_map[drcr_account.name].balance -= drcr_account.balance
accounts.remove(account)
del accounts_map[account.name]
return accounts
# Calculate trial balance
def trial_balance(date, pstart, cash=False):
# Get balances at period start
accounts_pstart = get_accounts(pstart - timedelta(days=1), cash)
accounts_map_pstart = make_account_tree(accounts_pstart)
# Get balances at date
accounts = get_accounts(date, cash)
# Adjust Retained Earnings
total_pandl = Decimal(0)
if config['income_account'] in accounts_map_pstart:
total_pandl = accounts_map_pstart[config['income_account']].total()
if config['expenses_account'] in accounts_map_pstart:
total_pandl += accounts_map_pstart[config['expenses_account']].total()
next(a for a in accounts if a.name == config['retained_earnings']).balance += total_pandl
# Adjust income/expense accounts
for account in accounts:
if amatch(config['income_account'], account.name) or amatch(config['expenses_account'], account.name):
if account.name in accounts_map_pstart:
account.balance -= accounts_map_pstart[account.name].balance
return accounts
# Calculate profit and loss
def pandl(date_beg, date_end, cash=False):
accounts = trial_balance(date_end, date_beg, cash)
for account in accounts[:]:
if not (amatch(config['income_account'], account.name) or amatch(config['expenses_account'], account.name)):
accounts.remove(account)
return accounts