# DrCr: Web-based double-entry bookkeeping framework # Copyright (C) 2022–2024 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from flask import Flask, g import toml app = Flask(__name__) app.config.from_file('config.toml', load=toml.load) from flask_sqlalchemy.record_queries import get_recorded_queries from .database import db from .models import Amount, Metadata, Transaction, reporting_commodity from .plugins import init_plugins, transaction_providers from .statements.models import StatementLine from datetime import datetime, timedelta import time app.config['SQLALCHEMY_RECORD_QUERIES'] = app.debug db.init_app(app) def limit_query_dt(query, field, start_date=None, end_date=None): """Helper function to limit the query between the start and end dates""" if start_date and end_date: return query.where((field >= start_date) & (field <= end_date)) if start_date: return query.where(field >= start_date) if end_date: return query.where(field <= end_date) return query def all_transactions(start_date=None, end_date=None, join_postings=True): """Return all transactions, including from DB and API""" # All Transactions in database between start_date and end_date query = db.select(Transaction) query = limit_query_dt(query, Transaction.dt, start_date, end_date) if join_postings: query = query.options(db.selectinload(Transaction.postings)) transactions = db.session.scalars(query).all() transactions.extend(api_transactions(start_date=start_date, end_date=end_date)) return transactions def api_transactions(start_date=None, end_date=None): """Return only transactions from API""" transactions = [] # Unreconciled StatementLines query = db.select(StatementLine).where(StatementLine.reconciliation == None) query = limit_query_dt(query, StatementLine.dt, start_date, end_date) transactions.extend(line.into_transaction() for line in db.session.scalars(query).all()) # Plugins for transaction_provider in transaction_providers: transactions.extend(transaction_provider(start_date=start_date, end_date=end_date)) return transactions def all_accounts(): # TODO: Can this be cached? return sorted(list(set(p.account for t in all_transactions() for p in t.postings))) def eofy_date(): """Get the datetime for the end of the financial year""" return datetime.strptime(Metadata.get('eofy_date'), '%Y-%m-%d') def sofy_date(): """Get the datetime for the start of the financial year""" dt = eofy_date() dt = dt.replace(year=dt.year - 1) dt += timedelta(days=1) return dt from . import views from .journal import views from .statements import views init_plugins() @app.cli.command('initdb') def initdb(): """Initialise database tables""" db.create_all() # FIXME: Need to init metadata @app.cli.command('recache_balances') def recache_balances(): """Recompute running_balance for all postings""" # Get all Transactions in database in correct order transactions = db.session.scalars(db.select(Transaction).options(db.selectinload(Transaction.postings)).order_by(Transaction.dt, Transaction.id)).all() accounts = {} for transaction in transactions: for posting in transaction.postings: if posting.account not in accounts: accounts[posting.account] = Amount(0, reporting_commodity()) # FIXME: Handle commodities better (ensure compatible commodities) accounts[posting.account].quantity += posting.amount().as_cost().quantity posting.running_balance = accounts[posting.account].quantity db.session.commit() @app.context_processor def add_reporting_commodity(): return dict(reporting_commodity=reporting_commodity()) if app.debug: @app.before_request def before_request(): g.start = time.time() @app.after_request def after_request(response): diff = time.time() - g.start if response.response and response.status_code == 200 and response.content_type.startswith('text/html'): response.set_data(response.get_data().replace(b'__EXECUTION_TIME__', bytes(format(diff * 1000, '.1f'), 'utf-8'))) return response @app.context_processor def add_dbtime(): def dbtime(): queries = get_recorded_queries() total_duration = sum(q.duration for q in queries) return format(total_duration * 1000, '.1f') return dict(dbtime=dbtime)