234 lines
9.1 KiB
Python
234 lines
9.1 KiB
Python
# DrCr: Web-based double-entry bookkeeping framework
|
|
# Copyright (C) 2022–2024 Lee Yingtong Li (RunasSudo)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
from flask import redirect, render_template, request, url_for
|
|
|
|
from drcr.models import AccountConfiguration, Amount, Posting, Transaction
|
|
from drcr.database import db
|
|
from drcr.plugins import render_plugin_template
|
|
from drcr.webapp import app, eofy_date
|
|
|
|
from .models import CGTAsset, CGTCostAdjustment
|
|
from .reports import tax_summary_report
|
|
|
|
from datetime import datetime
|
|
from math import copysign
|
|
|
|
@app.route('/tax/cgt-adjustments')
|
|
def cgt_adjustments():
|
|
adjustments = db.select(CGTCostAdjustment).order_by(CGTCostAdjustment.dt.desc(), CGTCostAdjustment.account, CGTCostAdjustment.id.desc())
|
|
if 'account' in request.args:
|
|
adjustments = adjustments.where(CGTCostAdjustment.account == request.args['account'])
|
|
if 'quantity' in request.args:
|
|
adjustments = adjustments.where(CGTCostAdjustment.quantity == request.args['quantity'])
|
|
if 'commodity' in request.args:
|
|
adjustments = adjustments.where(CGTCostAdjustment.commodity == request.args['commodity'])
|
|
if 'acquisition_date' in request.args:
|
|
adjustments = adjustments.where(CGTCostAdjustment.acquisition_date == datetime.strptime(request.args['acquisition_date'], '%Y-%m-%d'))
|
|
|
|
adjustments = db.session.scalars(adjustments).all()
|
|
return render_plugin_template('austax', 'cgt_adjustments.html', cgt_adjustments=adjustments)
|
|
|
|
@app.route('/tax/cgt-adjustments/new', methods=['GET', 'POST'])
|
|
def cgt_adjustment_new():
|
|
if request.method == 'GET':
|
|
return render_plugin_template('austax', 'cgt_adjustments_edit.html', adjustment=None)
|
|
|
|
asset = Amount.parse(request.form['asset'])
|
|
adjustment = CGTCostAdjustment(
|
|
quantity=asset.quantity,
|
|
commodity=asset.commodity,
|
|
account=request.form['account'],
|
|
acquisition_date=datetime.strptime(request.form['acquisition_date'], '%Y-%m-%d'),
|
|
dt=datetime.strptime(request.form['dt'], '%Y-%m-%d'),
|
|
description=request.form['description'],
|
|
cost_adjustment=Amount.parse(request.form['cost_adjustment']).quantity
|
|
)
|
|
db.session.add(adjustment)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('cgt_adjustments'))
|
|
|
|
@app.route('/tax/cgt-adjustments/edit', methods=['GET', 'POST'])
|
|
def cgt_adjustment_edit():
|
|
if request.method == 'GET':
|
|
return render_plugin_template('austax', 'cgt_adjustments_edit.html', adjustment=db.session.get(CGTCostAdjustment, request.args['id']))
|
|
|
|
asset = Amount.parse(request.form['asset'])
|
|
adjustment = db.session.get(CGTCostAdjustment, request.args['id'])
|
|
adjustment.quantity = asset.quantity
|
|
adjustment.commodity = asset.commodity
|
|
adjustment.account = request.form['account']
|
|
adjustment.acquisition_date = datetime.strptime(request.form['acquisition_date'], '%Y-%m-%d')
|
|
adjustment.dt = datetime.strptime(request.form['dt'], '%Y-%m-%d')
|
|
adjustment.description = request.form['description']
|
|
adjustment.cost_adjustment = Amount.parse(request.form['cost_adjustment']).quantity
|
|
|
|
db.session.add(adjustment)
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('cgt_adjustments'))
|
|
|
|
@app.route('/tax/cgt-adjustments/multi-new', methods=['GET', 'POST'])
|
|
def cgt_adjustment_multinew():
|
|
if request.method == 'GET':
|
|
return render_plugin_template(
|
|
'austax', 'cgt_adjustments_multinew.html',
|
|
account=None,
|
|
commodity=None,
|
|
dt=None,
|
|
description=None,
|
|
cost_adjustment=None
|
|
)
|
|
|
|
# TODO: Preview mode?
|
|
|
|
total_adjustment = Amount.parse(request.form['cost_adjustment']).quantity
|
|
|
|
# Get all postings to the CGT asset account
|
|
cgt_postings = db.session.scalars(
|
|
db.select(Posting)
|
|
.where(Posting.account == request.form['account'])
|
|
.join(Posting.transaction)
|
|
.order_by(Transaction.dt)
|
|
).all()
|
|
|
|
# Process postings to determine final balances
|
|
assets = []
|
|
|
|
for posting in cgt_postings:
|
|
if '{' not in posting.commodity and posting.commodity != request.form['commodity']:
|
|
continue
|
|
if '{' in posting.commodity and posting.commodity[:posting.commodity.index('{')].strip() != request.form['commodity']:
|
|
continue
|
|
|
|
if posting.quantity >= 0:
|
|
assets.append(CGTAsset(posting.quantity, posting.commodity, posting.account, posting.transaction.dt))
|
|
elif posting.quantity < 0:
|
|
asset = next((a for a in assets if a.commodity == posting.commodity and a.account == posting.account), None)
|
|
|
|
if asset is None:
|
|
raise Exception('Attempted credit {} without preceding debit balance'.quantity(posting.amount()))
|
|
if asset.quantity + posting.quantity < 0:
|
|
raise Exception('Attempted credit {} with insufficient debit balance {}'.quantity(posting.amount(), asset.amount()))
|
|
|
|
if asset.quantity + posting.quantity != 0:
|
|
raise NotImplementedError('Partial disposal of CGT asset not implemented')
|
|
|
|
assets.remove(asset)
|
|
|
|
# Distribute total adjustment across matching assets
|
|
total_quantity = sum(a.quantity for a in assets)
|
|
cgt_adjustments = {}
|
|
|
|
for asset in assets:
|
|
cgt_adjustments[asset] = total_adjustment * asset.quantity / total_quantity
|
|
|
|
# Round up as many as required to equal the total adjustment
|
|
rounding_shortfall = abs(total_adjustment) - sum(int(abs(v)) for v in cgt_adjustments.values())
|
|
largest_remainders = [(k, abs(v) - int(abs(v))) for k, v in cgt_adjustments.items()]
|
|
largest_remainders.sort(key=lambda x: x[1], reverse=True)
|
|
for asset, _ in largest_remainders[:rounding_shortfall]:
|
|
adjustment = cgt_adjustments[asset]
|
|
adjustment = copysign(int(abs(adjustment)) + 1, adjustment)
|
|
cgt_adjustments[asset] = adjustment
|
|
|
|
# Round others down
|
|
for asset, adjustment in cgt_adjustments.items():
|
|
cgt_adjustments[asset] = copysign(int(abs(adjustment)), adjustment)
|
|
|
|
# Sanity check
|
|
assert sum(v for v in cgt_adjustments.values()) == total_adjustment
|
|
|
|
# Add adjustments
|
|
for asset, adjustment in cgt_adjustments.items():
|
|
adjustment = CGTCostAdjustment(
|
|
quantity=asset.quantity,
|
|
commodity=asset.commodity,
|
|
account=asset.account,
|
|
acquisition_date=asset.acquisition_date,
|
|
dt=datetime.strptime(request.form['dt'], '%Y-%m-%d'),
|
|
description=request.form['description'],
|
|
cost_adjustment=adjustment
|
|
)
|
|
db.session.add(adjustment)
|
|
|
|
db.session.commit()
|
|
|
|
return redirect(url_for('cgt_adjustments'))
|
|
|
|
@app.route('/tax/cgt-assets')
|
|
def cgt_assets():
|
|
# Find all CGT asset accounts
|
|
cgt_accounts = []
|
|
account_configurations = AccountConfiguration.get_all_kinds()
|
|
for account_name, kinds in account_configurations.items():
|
|
if 'austax.cgtasset' in kinds:
|
|
cgt_accounts.append(account_name)
|
|
|
|
# Get all postings to CGT asset accounts
|
|
cgt_postings = db.session.scalars(
|
|
db.select(Posting)
|
|
.where(Posting.account.in_(cgt_accounts))
|
|
.join(Posting.transaction)
|
|
.order_by(Transaction.dt)
|
|
).all()
|
|
|
|
# Process postings to determine final balances
|
|
assets = []
|
|
|
|
for posting in cgt_postings:
|
|
if posting.commodity == '$':
|
|
# FIXME: Detect this better
|
|
continue
|
|
|
|
if posting.quantity >= 0:
|
|
assets.append(CGTAsset(posting.quantity, posting.commodity, posting.account, posting.transaction.dt))
|
|
elif posting.quantity < 0:
|
|
asset = next((a for a in assets if a.commodity == posting.commodity and a.account == posting.account), None)
|
|
|
|
if asset is None:
|
|
raise Exception('Attempted credit {} without preceding debit balance'.format(posting.amount()))
|
|
if asset.quantity + posting.quantity < 0:
|
|
raise Exception('Attempted credit {} with insufficient debit balance {}'.format(posting.amount(), asset.amount()))
|
|
|
|
if asset.quantity + posting.quantity != 0:
|
|
raise NotImplementedError('Partial disposal of CGT asset not implemented')
|
|
|
|
asset.disposal_date = posting.transaction.dt
|
|
|
|
# Calculate disposal value by searching for matching asset postings
|
|
asset.disposal_value = Amount(0, '$')
|
|
for other_posting in posting.transaction.postings:
|
|
if posting != other_posting and 'drcr.asset' in account_configurations.get(other_posting.account, []):
|
|
asset.disposal_value.quantity += other_posting.amount().as_cost().quantity
|
|
|
|
# Process CGT adjustments
|
|
for cost_adjustment in db.session.scalars(db.select(CGTCostAdjustment)).all():
|
|
asset = next((a for a in assets if a.quantity == cost_adjustment.quantity and a.commodity == cost_adjustment.commodity and a.account == cost_adjustment.account and a.acquisition_date == cost_adjustment.acquisition_date), None)
|
|
|
|
if asset is None:
|
|
raise Exception('No matching CGT asset for {}'.format(repr(cost_adjustment.asset())))
|
|
|
|
asset.cost_adjustments.append(cost_adjustment)
|
|
|
|
return render_plugin_template('austax', 'cgt_assets.html', assets=assets, eofy_date=eofy_date())
|
|
|
|
@app.route('/tax/summary')
|
|
def tax_summary():
|
|
report = tax_summary_report()
|
|
return render_template('report.html', report=report)
|