311 lines
10 KiB
Python
311 lines
10 KiB
Python
|
# scipy-yli: Helpful SciPy utilities and recipes
|
||
|
# Copyright © 2022 Lee Yingtong Li (RunasSudo)
|
||
|
#
|
||
|
# This program is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU Affero General Public License as published by
|
||
|
# the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU Affero General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU Affero General Public License
|
||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
|
||
|
import numpy as np
|
||
|
import patsy
|
||
|
from scipy import stats
|
||
|
import statsmodels.api as sm
|
||
|
from statsmodels.iolib.table import SimpleTable
|
||
|
|
||
|
from datetime import datetime
|
||
|
import itertools
|
||
|
|
||
|
from .utils import Estimate, check_nan, fmt_p_html, fmt_p_text
|
||
|
|
||
|
def cols_for_formula(formula):
|
||
|
"""Return the columns corresponding to the Patsy formula"""
|
||
|
|
||
|
model_desc = patsy.ModelDesc.from_formula(formula)
|
||
|
cols = set()
|
||
|
for term in model_desc.rhs_termlist:
|
||
|
for factor in term.factors:
|
||
|
name = factor.name()
|
||
|
if '(' in name:
|
||
|
# FIXME: Is there a better way of doing this?
|
||
|
name = name[name.index('(')+1:name.index(')')]
|
||
|
|
||
|
cols.add(name)
|
||
|
|
||
|
return list(cols)
|
||
|
|
||
|
# ----------
|
||
|
# Regression
|
||
|
|
||
|
class LikelihoodRatioTestResult:
|
||
|
"""Result of a likelihood ratio test for regression"""
|
||
|
|
||
|
def __init__(self, statistic, dof, pvalue):
|
||
|
self.statistic = statistic
|
||
|
self.dof = dof
|
||
|
self.pvalue = pvalue
|
||
|
|
||
|
def _repr_html_(self):
|
||
|
return 'LR({}) = {:.2f}; <i>p</i> {}'.format(self.dof, self.statistic, fmt_p_html(self.pvalue))
|
||
|
|
||
|
def summary(self):
|
||
|
return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p_text(self.pvalue))
|
||
|
|
||
|
class FTestResult:
|
||
|
"""Result of an F test for regression"""
|
||
|
|
||
|
def __init__(self, statistic, dof_model, dof_resid, pvalue):
|
||
|
self.statistic = statistic
|
||
|
self.dof_model = dof_model
|
||
|
self.dof_resid = dof_resid
|
||
|
self.pvalue = pvalue
|
||
|
|
||
|
def _repr_html_(self):
|
||
|
return '<i>F</i>({}, {}) = {:.2f}; <i>p</i> {}'.format(self.dof_model, self.dof_resid, self.statistic, fmt_p_html(self.pvalue))
|
||
|
|
||
|
def summary(self):
|
||
|
return 'F({}, {}) = {:.2f}; p {}'.format(self.dof_model, self.dof_resid, self.statistic, fmt_p_text(self.pvalue))
|
||
|
|
||
|
class RegressionResult:
|
||
|
"""
|
||
|
Result of a regression
|
||
|
|
||
|
llf/llnull: Log-likelihood of model/null model
|
||
|
"""
|
||
|
|
||
|
def __init__(self,
|
||
|
raw_result,
|
||
|
full_name, model_name, fit_method,
|
||
|
dep, nobs, dof_model, fitted_dt,
|
||
|
beta, pvalues,
|
||
|
llf, llnull,
|
||
|
dof_resid, rsquared, f_statistic,
|
||
|
exp
|
||
|
):
|
||
|
# A copy of the raw result so we can access it
|
||
|
self.raw_result = raw_result
|
||
|
|
||
|
# Information for display
|
||
|
self.full_name = full_name
|
||
|
self.model_name = model_name
|
||
|
self.fit_method = fit_method
|
||
|
|
||
|
# Basic fitted model information
|
||
|
self.dep = dep
|
||
|
self.nobs = nobs
|
||
|
self.dof_model = dof_model
|
||
|
self.fitted_dt = fitted_dt
|
||
|
|
||
|
# Regression coefficients
|
||
|
self.beta = beta
|
||
|
self.pvalues = pvalues
|
||
|
|
||
|
# Model log-likelihood
|
||
|
self.llf = llf
|
||
|
self.llnull = llnull
|
||
|
|
||
|
# Extra statistics (not all regression models have these)
|
||
|
self.dof_resid = dof_resid
|
||
|
self.rsquared = rsquared
|
||
|
self.f_statistic = f_statistic
|
||
|
|
||
|
# Config for display style
|
||
|
self.exp = exp
|
||
|
|
||
|
@property
|
||
|
def pseudo_rsquared(self):
|
||
|
"""McFadden's pseudo R-squared"""
|
||
|
|
||
|
return 1 - self.llf/self.llnull
|
||
|
|
||
|
def lrtest_null(self):
|
||
|
"""Compute the likelihood ratio test comparing the model to the null model"""
|
||
|
|
||
|
statistic = -2 * (self.llnull - self.llf)
|
||
|
pvalue = 1 - stats.chi2.cdf(statistic, self.dof_model)
|
||
|
|
||
|
return LikelihoodRatioTestResult(statistic, self.dof_model, pvalue)
|
||
|
|
||
|
def ftest(self):
|
||
|
"""Perform the F test that all slopes are 0"""
|
||
|
|
||
|
pvalue = 1 - stats.f(self.dof_model, self.dof_resid).cdf(self.f_statistic)
|
||
|
return FTestResult(self.f_statistic, self.dof_model, self.dof_resid, pvalue)
|
||
|
|
||
|
def _header_table(self, html):
|
||
|
"""Return the entries for the header table"""
|
||
|
|
||
|
# Left column
|
||
|
left_col = []
|
||
|
|
||
|
left_col.append(('Dep. Variable:', self.dep))
|
||
|
left_col.append(('Model:', self.model_name))
|
||
|
left_col.append(('Method:', self.fit_method))
|
||
|
left_col.append(('Date:', self.fitted_dt.strftime('%Y-%m-%d')))
|
||
|
left_col.append(('Time:', self.fitted_dt.strftime('%H:%M:%S')))
|
||
|
left_col.append(('No. Observations:', format(self.nobs, '.0f')))
|
||
|
|
||
|
# Right column
|
||
|
right_col = []
|
||
|
|
||
|
if self.dof_resid:
|
||
|
right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f')))
|
||
|
right_col.append(('Df. Model:', format(self.dof_model, '.0f')))
|
||
|
if self.rsquared:
|
||
|
right_col.append(('<i>R</i><sup>2</sup>:' if html else 'R²:', format(self.rsquared, '.2f')))
|
||
|
else:
|
||
|
right_col.append(('Pseudo <i>R</i><sup>2</sup>:' if html else 'Pseudo R²:', format(self.pseudo_rsquared, '.2f')))
|
||
|
if self.f_statistic:
|
||
|
# Report the F test if available
|
||
|
f_result = self.ftest()
|
||
|
|
||
|
if html:
|
||
|
right_col.append(('<i>F</i>:', format(f_result.statistic, '.2f')))
|
||
|
right_col.append(('<i>p</i> (<i>F</i>):', fmt_p_html(f_result.pvalue, True)))
|
||
|
else:
|
||
|
right_col.append(('F:', format(f_result.statistic, '.2f')))
|
||
|
right_col.append(('p (F):', fmt_p_text(f_result.pvalue, True)))
|
||
|
else:
|
||
|
# Otherwise report likelihood ratio test as overall test
|
||
|
lrtest_result = self.lrtest_null()
|
||
|
|
||
|
right_col.append(('LL-Model:', format(self.llf, '.2f')))
|
||
|
right_col.append(('LL-Null:', format(self.llnull, '.2f')))
|
||
|
if html:
|
||
|
right_col.append(('<i>p</i> (LR):', fmt_p_html(lrtest_result.pvalue, True)))
|
||
|
else:
|
||
|
right_col.append(('p (LR):', fmt_p_text(lrtest_result.pvalue, True)))
|
||
|
|
||
|
return left_col, right_col
|
||
|
|
||
|
def _repr_html_(self):
|
||
|
# Render header table
|
||
|
left_col, right_col = self._header_table(html=True)
|
||
|
|
||
|
out = '<table><caption>{} Results</caption>'.format(self.full_name)
|
||
|
for left_cell, right_cell in itertools.zip_longest(left_col, right_col):
|
||
|
out += '<tr><th>{}</th><td>{}</td><th>{}</th><td>{}</td></tr>'.format(
|
||
|
left_cell[0] if left_cell else '',
|
||
|
left_cell[1] if left_cell else '',
|
||
|
right_cell[0] if right_cell else '',
|
||
|
right_cell[1] if right_cell else ''
|
||
|
)
|
||
|
out += '</table>'
|
||
|
|
||
|
# Render coefficients table
|
||
|
out += '<table><tr><th></th><th style="text-align:center">{}</th><th colspan="3" style="text-align:center">(95% CI)</th><th style="text-align:center"><i>p</i></th></tr>'.format('exp(<i>β</i>)' if self.exp else '<i>β</i>')
|
||
|
|
||
|
for term, beta in self.beta.items():
|
||
|
# Exponentiate if requested
|
||
|
if self.exp:
|
||
|
beta = np.exp(beta)
|
||
|
|
||
|
out += '<tr><th>{}</th><td>{:.2f}</td><td style="padding-right:0">({:.2f}</td><td>–</td><td style="padding-left:0">{:.2f})</td><td style="text-align:left">{}</td></tr>'.format(term, beta.point, beta.ci_lower, beta.ci_upper, fmt_p_html(self.pvalues[term], True))
|
||
|
|
||
|
out += '</table>'
|
||
|
|
||
|
# TODO: Have a detailed view which shows SE, t/z, etc.
|
||
|
|
||
|
return out
|
||
|
|
||
|
def summary(self):
|
||
|
# Render header table
|
||
|
left_col, right_col = self._header_table(html=False)
|
||
|
|
||
|
# Ensure equal sizes for SimpleTable
|
||
|
if len(right_col) > len(left_col):
|
||
|
left_col.extend([('', '')] * (len(right_col) - len(left_col)))
|
||
|
elif len(left_col) > len(right_col):
|
||
|
right_col.extend([('', '')] * (len(left_col) - len(right_col)))
|
||
|
|
||
|
table1 = SimpleTable(np.concatenate([left_col, right_col], axis=1), title='{} Results'.format(self.full_name))
|
||
|
table1.insert_stubs(2, [' | '] * len(left_col))
|
||
|
|
||
|
# Get rid of last line (merge with next table)
|
||
|
table1_lines = table1.as_text().splitlines(keepends=False)
|
||
|
out = '\n'.join(table1_lines[:-1]) + '\n'
|
||
|
|
||
|
# Render coefficients table
|
||
|
table_data = []
|
||
|
|
||
|
for term, beta in self.beta.items():
|
||
|
# Exponentiate if requested
|
||
|
if self.exp:
|
||
|
beta = np.exp(estimate)
|
||
|
|
||
|
# Add some extra padding
|
||
|
table_data.append([term + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p_text(self.pvalues[term], True)])
|
||
|
|
||
|
table2 = SimpleTable(data=table_data, headers=['', 'exp(β)' if self.exp else 'β', '', '\ue000', '', ' p']) # U+E000 is in Private Use Area, mark middle of CI column
|
||
|
table2_text = table2.as_text().replace(' \ue000 ', '(95% CI)') # Render heading in the right spot
|
||
|
table2_lines = table2_text.splitlines(keepends=False)
|
||
|
|
||
|
# Render divider line between 2 tables
|
||
|
max_table_len = max(len(table1_lines[-1]), len(table2_lines[-1]))
|
||
|
out += '=' * max_table_len + '\n'
|
||
|
|
||
|
out += '\n'.join(table2_lines[1:])
|
||
|
|
||
|
return out
|
||
|
|
||
|
def regress(
|
||
|
model_class, df, dep, formula, *,
|
||
|
nan_policy='warn', exp=None
|
||
|
):
|
||
|
"""Fit a statsmodels regression model"""
|
||
|
|
||
|
# Autodetect whether to exponentiate
|
||
|
if exp is None:
|
||
|
if model_class is sm.Logit:
|
||
|
exp = True
|
||
|
else:
|
||
|
exp = False
|
||
|
|
||
|
# Check for/clean NaNs
|
||
|
df = df[[dep] + cols_for_formula(formula)]
|
||
|
df = check_nan(df, nan_policy)
|
||
|
|
||
|
# Ensure numeric type for dependent variable
|
||
|
if df[dep].dtype != 'float64':
|
||
|
df[dep] = df[dep].astype('float64')
|
||
|
|
||
|
# Convert pandas nullable types for independent variables
|
||
|
for col in df.columns:
|
||
|
if df[col].dtype == 'Int64':
|
||
|
df[col] = df[col].astype('float64')
|
||
|
|
||
|
# Fit model
|
||
|
model = model_class.from_formula(formula=dep + ' ~ ' + formula, data=df)
|
||
|
result = model.fit()
|
||
|
|
||
|
confint = result.conf_int()
|
||
|
beta = {t: Estimate(b, confint[0][t], confint[1][t]) for t, b in result.params.items()}
|
||
|
|
||
|
# Fit null model (for llnull)
|
||
|
if hasattr(result, 'llnull'):
|
||
|
llnull = result.llnull
|
||
|
else:
|
||
|
result_null = model_class.from_formula(formula=dep + ' ~ 1', data=df).fit()
|
||
|
llnull = result_null.llf
|
||
|
|
||
|
# Parse raw regression results (to get fit method)
|
||
|
header_entries = np.vectorize(str.strip)(np.concatenate(np.split(np.array(result.summary().tables[0].data), 2, axis=1)))
|
||
|
header_dict = {x[0]: x[1] for x in header_entries}
|
||
|
|
||
|
return RegressionResult(
|
||
|
result,
|
||
|
'Logistic Regression' if model_class is sm.Logit else '{} Regression'.format(model_class.__name__), model_class.__name__, header_dict['Method:'],
|
||
|
dep, result.nobs, result.df_model, datetime.now(),
|
||
|
beta, result.pvalues,
|
||
|
result.llf, llnull,
|
||
|
getattr(result, 'df_resid', None), getattr(result, 'rsquared', None), getattr(result, 'fvalue', None),
|
||
|
exp
|
||
|
)
|