DrCr/drcr/webapp.py

153 lines
4.8 KiB
Python
Raw Normal View History

2022-12-24 12:31:45 +11:00
# DrCr: Web-based double-entry bookkeeping framework
2024-11-09 18:14:25 +11:00
# Copyright (C) 2022–2024 Lee Yingtong Li (RunasSudo)
2022-12-24 12:31:45 +11:00
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from flask import Flask, g
2023-01-05 16:19:29 +11:00
import toml
app = Flask(__name__)
app.config.from_file('config.toml', load=toml.load)
from flask_sqlalchemy.record_queries import get_recorded_queries
2022-12-24 12:31:45 +11:00
from .database import db
from .models import Amount, Metadata, Transaction, reporting_commodity
2023-01-04 18:01:59 +11:00
from .plugins import init_plugins, transaction_providers
2023-01-02 18:08:30 +11:00
from .statements.models import StatementLine
2022-12-24 12:31:45 +11:00
from datetime import datetime, timedelta
import time
app.config['SQLALCHEMY_RECORD_QUERIES'] = app.debug
db.init_app(app)
def limit_query_dt(query, field, start_date=None, end_date=None):
"""Helper function to limit the query between the start and end dates"""
if start_date and end_date:
return query.where((field >= start_date) & (field <= end_date))
if start_date:
return query.where(field >= start_date)
if end_date:
return query.where(field <= end_date)
return query
def all_transactions(start_date=None, end_date=None, join_postings=True):
"""Return all transactions, including from DB and API"""
# All Transactions in database between start_date and end_date
query = db.select(Transaction)
query = limit_query_dt(query, Transaction.dt, start_date, end_date)
if join_postings:
query = query.options(db.selectinload(Transaction.postings))
transactions = db.session.scalars(query).all()
2023-01-04 18:01:59 +11:00
transactions.extend(api_transactions(start_date=start_date, end_date=end_date))
return transactions
def api_transactions(start_date=None, end_date=None):
"""Return only transactions from API"""
transactions = []
2023-01-04 18:01:59 +11:00
# Unreconciled StatementLines
query = db.select(StatementLine).where(StatementLine.reconciliation == None)
query = limit_query_dt(query, StatementLine.dt, start_date, end_date)
transactions.extend(line.into_transaction() for line in db.session.scalars(query).all())
2023-01-04 18:01:59 +11:00
# Plugins
for transaction_provider in transaction_providers:
transactions.extend(transaction_provider(start_date=start_date, end_date=end_date))
2023-01-04 18:01:59 +11:00
return transactions
2022-12-24 12:31:45 +11:00
def all_accounts():
"""Return all accounts in alphabetical order"""
return db.session.scalars('SELECT DISTINCT account FROM postings ORDER BY account').all()
def eofy_date():
"""Get the datetime for the end of the financial year"""
return datetime.strptime(Metadata.get('eofy_date'), '%Y-%m-%d')
def sofy_date():
"""Get the datetime for the start of the financial year"""
dt = eofy_date()
dt = dt.replace(year=dt.year - 1)
dt += timedelta(days=1)
return dt
2022-12-24 15:18:53 +11:00
from . import views
2023-01-02 18:08:30 +11:00
from .journal import views
2022-12-24 12:31:45 +11:00
from .statements import views
2023-01-04 18:01:59 +11:00
init_plugins()
2022-12-24 12:31:45 +11:00
@app.cli.command('initdb')
def initdb():
"""Initialise database tables"""
db.create_all()
2023-07-03 22:31:58 +10:00
# FIXME: Need to init metadata
2022-12-24 12:31:45 +11:00
@app.cli.command('recache_balances')
def recache_balances():
"""Recompute running_balance for all postings"""
# Get all Transactions in database in correct order
transactions = db.session.scalars(db.select(Transaction).options(db.selectinload(Transaction.postings)).order_by(Transaction.dt, Transaction.id)).all()
accounts = {}
for transaction in transactions:
for posting in transaction.postings:
if posting.account not in accounts:
accounts[posting.account] = Amount(0, reporting_commodity())
# FIXME: Handle commodities better (ensure compatible commodities)
accounts[posting.account].quantity += posting.amount().as_cost().quantity
posting.running_balance = accounts[posting.account].quantity
db.session.commit()
2024-11-09 18:14:25 +11:00
@app.context_processor
def add_reporting_commodity():
return dict(reporting_commodity=reporting_commodity())
if app.debug:
@app.before_request
def before_request():
g.start = time.time()
@app.after_request
def after_request(response):
diff = time.time() - g.start
if response.response and response.status_code == 200 and response.content_type.startswith('text/html'):
response.set_data(response.get_data().replace(b'__EXECUTION_TIME__', bytes(format(diff * 1000, '.1f'), 'utf-8')))
return response
@app.context_processor
def add_dbtime():
def dbtime():
queries = get_recorded_queries()
total_duration = sum(q.duration for q in queries)
return format(total_duration * 1000, '.1f')
return dict(dbtime=dbtime)