scipy-yli/yli/regress.py

1145 lines
40 KiB
Python
Raw Normal View History

2022-10-13 15:57:56 +11:00
# scipy-yli: Helpful SciPy utilities and recipes
# Copyright © 2022–2023 Lee Yingtong Li (RunasSudo)
2022-10-13 15:57:56 +11:00
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import numpy as np
2022-10-13 16:24:01 +11:00
import pandas as pd
2022-11-29 14:40:50 +11:00
import patsy
2022-10-13 15:57:56 +11:00
from scipy import stats
from scipy.special import expit
2023-03-05 02:11:12 +11:00
import statsmodels, statsmodels.base.model, statsmodels.miscmodels.ordinal_model
2022-10-13 15:57:56 +11:00
import statsmodels.api as sm
from statsmodels.iolib.table import SimpleTable
2022-10-13 16:24:01 +11:00
from statsmodels.stats.outliers_influence import variance_inflation_factor
2022-10-13 15:57:56 +11:00
from datetime import datetime
2023-04-16 21:56:09 +10:00
import io
2022-10-13 15:57:56 +11:00
import itertools
2023-04-16 21:56:09 +10:00
import json
import subprocess
import weakref
2022-10-13 15:57:56 +11:00
from .bayes_factors import BayesFactor, bayesfactor_afbf
from .config import config
from .shap import ShapResult
2022-12-02 20:20:25 +11:00
from .sig_tests import ChiSquaredResult, FTestResult
from .utils import Estimate, PValueStyle, as_numeric, check_nan, cols_for_formula, convert_pandas_nullable, fmt_p, formula_factor_ref_category, parse_patsy_term
2022-10-13 15:57:56 +11:00
2023-04-16 21:56:09 +10:00
# TODO: Documentation
# TODO: Bootstrap
2022-10-17 21:41:19 +11:00
def vif(df, formula=None, *, nan_policy='warn'):
"""
2022-10-17 21:41:19 +11:00
Calculate the variance inflation factor (VIF) for each variable in *df*
2022-10-17 21:41:19 +11:00
:param df: Data to calculate the VIF for
:type df: DataFrame
:param formula: If specified, calculate the VIF only for the variables in the formula
:type formula: str
:return: The variance inflation factors
:rtype: Series
**Example:**
.. code-block::
df = pd.DataFrame({
'D': [68.58, 67.33, 67.33, ...],
'T1': [14, 10, 10, ...],
'T2': [46, 73, 85, ...],
...
})
yli.vif(df[['D', 'T1', 'T2', ...]])
.. code-block:: text
D 8.318301
T1 6.081590
T2 2.457122
...
dtype: float64
The output shows the variance inflation factor for each variable in *df*.
"""
if formula:
# Only consider columns in the formula
df = df[cols_for_formula(formula, df)]
2022-10-13 16:24:01 +11:00
# Check for/clean NaNs
df = check_nan(df, nan_policy)
# Convert all to float64 otherwise statsmodels chokes with "ufunc 'isfinite' not supported for the input types ..."
df = pd.get_dummies(df, drop_first=True) # Convert categorical dtypes
df = df.astype('float64') # Convert all other dtypes
# Add intercept column
orig_columns = list(df.columns)
df['Intercept'] = [1] * len(df)
vifs = {}
for i, col in enumerate(orig_columns):
vifs[col] = variance_inflation_factor(df, i)
return pd.Series(vifs)
2023-04-16 21:56:09 +10:00
def regress(model_class, df, dep, formula, *, nan_policy='warn', bool_baselevels=False, exp=None):
2022-10-17 21:41:19 +11:00
"""
2023-04-16 21:56:09 +10:00
Fit a statsmodels regression model
2022-10-17 21:41:19 +11:00
2023-04-16 21:56:09 +10:00
:param model_class: Type of regression model to fit
2023-04-16 23:52:12 +10:00
:type model_class: :class:`yli.regress.RegressionModel` subclass
2023-04-16 21:56:09 +10:00
:param df: Data to perform regression on
:type df: DataFrame
2023-04-17 22:38:44 +10:00
:param dep: Column(s) in *df* for the dependent variable (numeric)
2023-04-16 21:56:09 +10:00
:type dep: str
:param formula: Patsy formula for the regression model
:type formula: str
:param nan_policy: How to handle *nan* values (see :ref:`nan-handling`)
:type nan_policy: str
:param bool_baselevels: Show reference categories for boolean independent variables even if reference category is *False*
:type bool_baselevels: bool
:param exp: Report exponentiated parameters rather than raw parameters, default (*None*) is to autodetect based on *model_class*
:type exp: bool
2022-10-13 15:57:56 +11:00
2023-04-16 23:52:12 +10:00
:rtype: :class:`yli.regress.RegressionModel`
2023-04-16 23:52:12 +10:00
**Example:** See :class:`yli.OLS`, :class:`yli.Logit`, etc.
2022-10-13 15:57:56 +11:00
"""
2023-04-16 21:56:09 +10:00
if not any(x.__name__ == 'RegressionModel' for x in model_class.__bases__):
raise ValueError('model_class must be a RegressionModel')
df_ref = weakref.ref(df)
dmatrices, dep_categories = df_to_dmatrices(df, dep, formula, nan_policy)
# Fit model
result = model_class.fit(dmatrices[0], dmatrices[1])
result.df = df_ref
result.dep = dep
result.formula = formula
result.nan_policy = nan_policy
if exp is not None:
result.exp = exp
result.fitted_dt = datetime.now()
result.nobs = len(dmatrices[0])
# Process categorical terms, etc.
raw_terms = result.terms
result.terms = {}
for raw_name, raw_term in raw_terms.items():
# Rename terms
if raw_name == '(Intercept)':
# Already processed
result.terms[raw_name] = raw_term
elif raw_name == '(Cutoffs)':
result.terms[raw_name] = raw_term
if dep_categories is not None:
# Need to convert factorised names back into original names
old_cutoffs = raw_term.categories
raw_term.categories = {}
for cutoff_raw_name, cutoff_term in old_cutoffs.items():
bits = cutoff_raw_name.split('/')
term = dep_categories[round(float(bits[0]))] + '/' + dep_categories[round(float(bits[1]))]
result.terms[raw_name].categories[term] = cutoff_term
else:
# Parse if required
factor, column, contrast = parse_patsy_term(formula, df, raw_name)
if contrast is not None:
# Categorical term
if bool_baselevels is False and contrast == 'True' and set(df[column].unique()) == set([True, False]):
# Treat as single term
result.terms[column] = raw_term
else:
# Add a new categorical term if not exists
if column not in result.terms:
ref_category = formula_factor_ref_category(formula, df, factor)
result.terms[column] = CategoricalTerm({}, ref_category)
result.terms[column].categories[contrast] = raw_term
else:
# Single term
result.terms[column] = raw_term
return result
def df_to_dmatrices(df, dep, formula, nan_policy):
# Check for/clean NaNs in input columns
2023-04-17 22:38:44 +10:00
columns = cols_for_formula(dep, df) + cols_for_formula(formula, df)
2023-04-16 21:56:09 +10:00
df = df[columns]
df = check_nan(df, nan_policy)
# Ensure numeric type for dependent variable
2023-04-17 22:38:44 +10:00
if '+' in dep:
dep_categories = None
else:
df[dep], dep_categories = as_numeric(df[dep])
2023-04-16 21:56:09 +10:00
# Convert pandas nullable types for independent variables as this breaks statsmodels
df = convert_pandas_nullable(df)
# Construct design matrix from formula
dmatrices = patsy.dmatrices(dep + ' ~ ' + formula, df, return_type='dataframe')
return dmatrices, dep_categories
class RegressionModel:
def __init__(self):
# Model configuration (all set in yli.regress)
self.df = None
self.dep = None
self.formula = None
self.nan_policy = None
self.exp = False
self.cov_type = None
# Summary information
self.fitted_dt = None # Set in yli.regress
self.nobs = None # Set in yli.regress
self.nevents = None
self.dof_model = None
self.dof_resid = None
self.rsquared = None
self.ll_model = None
self.ll_null = None
self.f_statistic = None
# Parameters
self.terms = None
self.vcov = None
# Comments
self.comments = []
2022-10-13 15:57:56 +11:00
@property
2023-04-16 21:56:09 +10:00
def model_long_name(self):
return None
2022-10-13 15:57:56 +11:00
2023-04-16 21:56:09 +10:00
@property
def model_short_name(self):
return None
def _header_table(self, html):
"""Return the entries for the header table"""
2022-10-17 21:41:19 +11:00
2023-04-16 21:56:09 +10:00
# Left column
left_col = []
2022-10-13 15:57:56 +11:00
2023-04-16 21:56:09 +10:00
left_col.append(('Dep. Variable:', self.dep))
left_col.append(('Model:', self.model_short_name))
left_col.append(('Date:', self.fitted_dt.strftime('%Y-%m-%d')))
left_col.append(('Time:', self.fitted_dt.strftime('%H:%M:%S')))
if self.cov_type:
left_col.append(('Std. Errors:', 'Non-Robust' if self.cov_type == 'nonrobust' else self.cov_type.upper() if self.cov_type.startswith('hc') else self.cov_type))
2022-10-13 15:57:56 +11:00
2023-04-16 21:56:09 +10:00
# Right column
right_col = []
2022-10-17 21:41:19 +11:00
2023-04-16 21:56:09 +10:00
right_col.append(('No. Observations:', format(self.nobs, '.0f')))
if self.nevents:
right_col.append(('No. Events:', format(self.nevents, '.0f')))
if self.dof_model:
right_col.append(('Df. Model:', format(self.dof_model, '.0f')))
if self.dof_resid:
right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f')))
if self.rsquared:
right_col.append(('<i>R</i><sup>2</sup>:' if html else 'R²:', format(self.rsquared, '.2f')))
elif self.ll_null:
right_col.append(('Pseudo <i>R</i><sup>2</sup>:' if html else 'Pseudo R²:', format(self.pseudo_rsquared, '.2f')))
if self.f_statistic:
# Report the F test if available
f_result = self.ftest()
if html:
right_col.append(('<i>F</i>:', format(f_result.statistic, '.2f')))
right_col.append(('<i>p</i> (<i>F</i>):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML)))
else:
right_col.append(('F:', format(f_result.statistic, '.2f')))
right_col.append(('p (F):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY)))
else:
# Otherwise report likelihood ratio test as overall test
right_col.append(('LL-Model:', format(self.ll_model, '.2f')))
if self.ll_null:
lrtest_result = self.lrtest_null()
right_col.append(('LL-Null:', format(self.ll_null, '.2f')))
if html:
right_col.append(('<i>p</i> (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML)))
else:
right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY)))
2022-10-13 15:57:56 +11:00
2023-04-16 21:56:09 +10:00
return left_col, right_col
2022-10-13 15:57:56 +11:00
2023-04-16 21:56:09 +10:00
def __repr__(self):
if config.repr_is_summary:
return self.summary()
return super().__repr__()
def _repr_html_(self):
# Render header table
left_col, right_col = self._header_table(html=True)
2022-10-17 21:41:19 +11:00
2023-04-16 21:56:09 +10:00
out = '<table><caption>{} Results</caption>'.format(self.model_long_name)
for left_cell, right_cell in itertools.zip_longest(left_col, right_col):
out += '<tr><th>{}</th><td>{}</td><th>{}</th><td>{}</td></tr>'.format(
left_cell[0] if left_cell else '',
left_cell[1] if left_cell else '',
right_cell[0] if right_cell else '',
right_cell[1] if right_cell else ''
)
out += '</table>'
2022-10-17 21:41:19 +11:00
2023-04-16 21:56:09 +10:00
# Render coefficients table
out += '<table><tr><th></th><th style="text-align:center">{}</th><th colspan="3" style="text-align:center">({:g}% CI)</th><th style="text-align:center"><i>p</i></th></tr>'.format('exp(<i>β</i>)' if self.exp else '<i>β</i>', (1-config.alpha)*100)
2022-10-17 21:41:19 +11:00
2023-04-16 21:56:09 +10:00
for term_name, term in self.terms.items():
if isinstance(term, SingleTerm):
# Single term
# Exponentiate beta if requested
beta = term.beta
if self.exp:
beta = np.exp(beta)
out += '<tr><th>{}</th><td>{:.2f}</td><td style="padding-right:0">({:.2f}</td><td>–</td><td style="padding-left:0">{:.2f})</td><td style="text-align:left">{}</td></tr>'.format(term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
elif isinstance(term, CategoricalTerm):
# Categorical term
out += '<tr><th>{}</th><td></td><td style="padding-right:0"></td><td></td><td style="padding-left:0"></td><td></td></tr>'.format(term_name)
# Render reference category
if term.ref_category is not None:
out += '<tr><td style="text-align:right;font-style:italic">{}</td><td>Ref.</td><td style="padding-right:0"></td><td></td><td style="padding-left:0"></td><td></td></tr>'.format(term.ref_category)
# Loop over terms
for sub_term_name, sub_term in term.categories.items():
# Exponentiate beta if requested
beta = sub_term.beta
if self.exp:
beta = np.exp(beta)
out += '<tr><td style="text-align:right;font-style:italic">{}</td><td>{:.2f}</td><td style="padding-right:0">({:.2f}</td><td>–</td><td style="padding-left:0">{:.2f})</td><td style="text-align:left">{}</td></tr>'.format(sub_term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(sub_term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
else:
raise Exception('Attempt to render unknown term type')
2023-04-16 21:56:09 +10:00
out += '</table>'
2023-04-16 21:56:09 +10:00
# TODO: Have a detailed view which shows SE, t/z, etc.
2023-04-16 21:56:09 +10:00
if self.comments:
out += '<ol>'
for comment in self.comments:
out += '<li>{}</li>'.format(comment)
out += '</ol>'
2023-04-16 21:56:09 +10:00
return out
2023-04-16 21:56:09 +10:00
def summary(self):
"""
2023-04-16 21:56:09 +10:00
Return a stringified summary of the regression model
2023-04-16 21:56:09 +10:00
:rtype: str
"""
2023-04-16 21:56:09 +10:00
# Render header table
left_col, right_col = self._header_table(html=False)
2023-04-16 21:56:09 +10:00
# Ensure equal sizes for SimpleTable
if len(right_col) > len(left_col):
left_col.extend([('', '')] * (len(right_col) - len(left_col)))
elif len(left_col) > len(right_col):
right_col.extend([('', '')] * (len(left_col) - len(right_col)))
2023-04-16 21:56:09 +10:00
table1 = SimpleTable(np.concatenate([left_col, right_col], axis=1), title='{} Results'.format(self.model_long_name))
table1.insert_stubs(2, [' | '] * len(left_col))
2023-04-16 21:56:09 +10:00
# Get rid of last line (merge with next table)
table1_lines = table1.as_text().splitlines(keepends=False)
out = '\n'.join(table1_lines[:-1]) + '\n'
2023-04-16 21:56:09 +10:00
# Render coefficients table
table_data = []
2023-04-16 21:56:09 +10:00
for term_name, term in self.terms.items():
if isinstance(term, SingleTerm):
# Single term
2023-04-16 21:56:09 +10:00
# Exponentiate beta if requested
beta = term.beta
if self.exp:
beta = np.exp(beta)
2023-04-16 21:56:09 +10:00
# Add some extra padding
table_data.append([term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(term.pvalue, PValueStyle.TABULAR)])
elif isinstance(term, CategoricalTerm):
# Categorical term
table_data.append([term_name + ' ', '', '', '', '', ''])
# Render reference category
if term.ref_category is not None:
table_data.append(['{} '.format(term.ref_category), 'Ref.', '', '', '', ''])
# Loop over terms
for sub_term_name, sub_term in term.categories.items():
# Exponentiate beta if requested
beta = sub_term.beta
if self.exp:
beta = np.exp(beta)
table_data.append([sub_term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(sub_term.pvalue, PValueStyle.TABULAR)])
else:
raise Exception('Attempt to render unknown term type')
2023-04-16 21:56:09 +10:00
table2 = SimpleTable(data=table_data, headers=['', 'exp(β)' if self.exp else 'β', '', '\ue000', '', ' p']) # U+E000 is in Private Use Area, mark middle of CI column
table2_text = table2.as_text().replace(' \ue000 ', '({:g}% CI)'.format((1-config.alpha)*100)) # Render heading in the right spot
table2_lines = table2_text.splitlines(keepends=False)
2023-04-16 21:56:09 +10:00
# Render divider line between 2 tables
max_table_len = max(len(table1_lines[-1]), len(table2_lines[-1]))
out += '=' * max_table_len + '\n'
2023-04-16 21:56:09 +10:00
out += '\n'.join(table2_lines[1:])
2023-04-16 21:56:09 +10:00
if self.comments:
out += '\n'
for i, comment in enumerate(self.comments):
out += '\n{}. {}'.format(i + 1, comment)
2023-04-16 21:56:09 +10:00
return out
2023-04-16 21:56:09 +10:00
# --------------------
# Post hoc tests, etc.
def bayesfactor_beta_zero(self, term):
"""
2023-04-16 21:56:09 +10:00
Compute a Bayes factor testing the hypothesis that the given beta is 0
2023-04-16 21:56:09 +10:00
Uses the R *BFpack* library.
2023-04-16 21:56:09 +10:00
Requires the regression to be from statsmodels.
2023-04-16 23:52:12 +10:00
The term must be specified as the *raw name* from the statsmodels regression, available via :attr:`SingleTerm.raw_name`.
2023-04-16 21:56:09 +10:00
:param term: Raw name of the term to be tested
:type term: str
2023-04-16 21:56:09 +10:00
:rtype: :class:`yli.bayes_factors.BayesFactor`
"""
2023-04-16 21:56:09 +10:00
# FIXME: Allow specifying our renamed terms
2023-04-16 21:56:09 +10:00
# Get parameters required for AFBF
raw_params = {}
for t in self.terms.values():
if isinstance(t, CategoricalTerm):
for t in t.categories.values():
raw_params[t.raw_name.replace('[', '_').replace(']', '_')] = t.beta.point
else:
raw_params[t.raw_name.replace('[', '_').replace(']', '_')] = t.beta.point
2023-04-16 21:56:09 +10:00
# Compute BF matrix
bf01 = bayesfactor_afbf(pd.Series(raw_params), self.vcov, self.nobs, '{} = 0'.format(term.replace('[', '_').replace(']', '_')))
bf01 = BayesFactor(bf01.factor, '0', '{} = 0'.format(term), '1', '{} ≠ 0'.format(term))
2023-04-16 21:56:09 +10:00
if bf01.factor >= 1:
return bf01
else:
return bf01.invert()
def ftest(self):
"""
Perform the *F* test that all slopes are 0
2023-04-16 21:56:09 +10:00
:rtype: :class:`yli.sig_tests.FTestResult`
"""
pvalue = 1 - stats.f(self.dof_model, self.dof_resid).cdf(self.f_statistic)
return FTestResult(self.f_statistic, self.dof_model, self.dof_resid, pvalue)
def lrtest_null(self):
"""
Compute the likelihood ratio test comparing the model to the null model
2023-04-16 21:56:09 +10:00
:rtype: :class:`LikelihoodRatioTestResult`
"""
2023-04-16 21:56:09 +10:00
statistic = -2 * (self.ll_null - self.ll_model)
pvalue = 1 - stats.chi2.cdf(statistic, self.dof_model)
LikelihoodRatioTestResult
return LikelihoodRatioTestResult(statistic, self.dof_model, pvalue)
@property
def pseudo_rsquared(self):
"""McFadden's pseudo *R*:sup:`2` statistic"""
2023-04-16 21:56:09 +10:00
return 1 - self.ll_model/self.ll_null
def shap(self, **kwargs):
2023-02-25 23:46:48 +11:00
"""
Compute SHAP values for the model
Uses the Python *shap* library.
:param kwargs: Keyword arguments to pass to *shap.LinearExplainer*
:rtype: :class:`yli.shap.ShapResult`
**Reference:** Lundberg SM, Lee SI. A unified approach to interpreting model predictions. In: Guyon I, Von Luxburg U, Bengio S, et al., editors. *Advances in Neural Information Processing Systems*; 2017 Dec 49; Long Beach, CA. https://proceedings.neurips.cc/paper/2017/hash/8a20a8621978632d76c43dfd28b67767-Abstract.html
"""
import shap
xdata = ShapResult._get_xdata(self)
# Combine terms into single list
params = []
for term in self.terms.values():
if isinstance(term, SingleTerm):
params.append(term.beta.point)
else:
params.extend(s.beta.point for s in term.categories.values())
explainer = shap.LinearExplainer((np.array(params[1:]), params[0]), xdata, **kwargs) # FIXME: Assumes zeroth term is intercept
shap_values = explainer.shap_values(xdata).astype('float')
return ShapResult(weakref.ref(self), shap_values, list(xdata.columns))
2023-04-16 21:56:09 +10:00
class LikelihoodRatioTestResult(ChiSquaredResult):
"""
Result of a likelihood ratio test for regression
2023-04-16 23:52:12 +10:00
See :meth:`RegressionModel.lrtest_null`.
2023-04-16 21:56:09 +10:00
"""
2022-10-13 15:57:56 +11:00
2023-04-16 21:56:09 +10:00
def __init__(self, statistic, dof, pvalue):
super().__init__(statistic, dof, pvalue)
2022-10-13 15:57:56 +11:00
def _repr_html_(self):
2023-04-16 21:56:09 +10:00
return 'LR({}) = {:.2f}; <i>p</i> {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION | PValueStyle.HTML))
2022-10-13 15:57:56 +11:00
def summary(self):
2022-10-17 21:41:19 +11:00
"""
2023-04-16 21:56:09 +10:00
Return a stringified summary of the likelihood ratio test
2022-10-17 21:41:19 +11:00
:rtype: str
"""
2023-04-16 21:56:09 +10:00
return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION))
2022-10-13 15:57:56 +11:00
class SingleTerm:
2023-04-16 21:56:09 +10:00
"""A term in a :class:`RegressionModel` which is a single term"""
def __init__(self, raw_name, beta, pvalue):
2023-04-16 23:52:12 +10:00
#: Raw name of the term (*str*)
self.raw_name = raw_name
2022-10-17 21:41:19 +11:00
#: :class:`yli.utils.Estimate` of the coefficient
self.beta = beta
2022-10-17 21:41:19 +11:00
#: *p* value for the coefficient (*float*)
self.pvalue = pvalue
class CategoricalTerm:
2023-04-16 21:56:09 +10:00
"""A group of terms in a :class:`RegressionModel` corresponding to a categorical variable"""
def __init__(self, categories, ref_category):
2022-10-17 21:41:19 +11:00
#: Terms for each of the categories, excluding the reference category (*dict* of :class:`SingleTerm`)
self.categories = categories
2022-10-17 21:41:19 +11:00
#: Name of the reference category (*str*)
self.ref_category = ref_category
2023-04-16 21:56:09 +10:00
def raw_terms_from_statsmodels_result(raw_result):
return {
('(Intercept)' if raw_name == 'Intercept' else raw_name): SingleTerm(
raw_name=raw_name,
beta=Estimate(raw_param, raw_ci[0], raw_ci[1]),
pvalue=raw_p
)
for raw_name, raw_param, raw_ci, raw_p in zip(raw_result.model.exog_names, raw_result.params.values, raw_result.conf_int(config.alpha).values, raw_result.pvalues.values)
}
# ------------------------
# Concrete implementations
2023-04-17 22:38:44 +10:00
class IntervalCensoredCox(RegressionModel):
"""
Interval-censored Cox regression
Uses hpstat *intcox* command.
**Example:**
.. code-block::
df = pd.DataFrame(...)
yli.regress(yli.IntervalCensoredCox, df, 'Left_Time + Right_Time', 'A + B + C + D + E + F + G + H')
.. code-block:: text
Interval-Censored Cox Regression Results
===================================================================
Dep. Variable: Left_Time + Right_Time | No. Observations: 1124
Model: Interval-Censored Cox | No. Events: 133
Date: 2023-04-17 | Df. Model: 8
Time: 22:30:40 | Pseudo : 0.00
Std. Errors: OPG | LL-Model: -613.21
| LL-Null: -615.81
| p (LR): 0.82
===================================================================
exp(β) (95% CI) p
----------------------------------
A 0.83 (0.37 - 1.88) 0.66
B 1.08 (0.81 - 1.46) 0.60
C 0.49 (0.24 - 1.00) 0.052
D 0.79 (0.42 - 1.50) 0.48
E 0.87 (0.40 - 1.85) 0.71
F 0.64 (0.28 - 1.45) 0.29
G 1.07 (0.44 - 2.62) 0.88
H 1.23 (0.48 - 3.20) 0.67
----------------------------------
The output summarises the result of the regression.
**Reference:** Zeng D, Mao L, Lin DY. Maximum likelihood estimation for semiparametric transformation models with interval-censored data. *Biometrika*. 2016;103(2):25371. `doi:10.1093/biomet/asw013 <https://doi.org/10.1093/biomet/asw013>`_
"""
@property
def model_long_name(self):
return 'Interval-Censored Cox Regression'
@property
def model_short_name(self):
return 'Interval-Censored Cox'
@classmethod
def fit(cls, data_dep, data_ind):
if len(data_dep.columns) != 2:
raise ValueError('IntervalCensoredCox requires left and right times')
# Drop explicit intercept term
if 'Intercept' in data_ind:
del data_ind['Intercept']
result = cls()
result.exp = True
result.cov_type = 'OPG'
result.nevents = np.isfinite(data_dep.iloc[:, 1]).sum()
result.dof_model = len(data_ind.columns)
# Export data to CSV
csv_buf = io.StringIO()
data_dep.join(data_ind).to_csv(csv_buf, index=False)
csv_str = csv_buf.getvalue()
# Run intcens binary
proc = subprocess.run([config.hpstat_path, 'intcox', '-', '--output', 'json'], input=csv_str, capture_output=True, encoding='utf-8', check=True)
raw_result = json.loads(proc.stdout)
z_critical = -stats.norm.ppf(config.alpha / 2)
result.terms = {raw_name: SingleTerm(
raw_name=raw_name,
beta=Estimate(raw_param, raw_param - z_critical * raw_se, raw_param + z_critical * raw_se),
pvalue=stats.norm.cdf(-np.abs(raw_param) / raw_se) * 2
) for raw_name, raw_param, raw_se in zip(data_ind.columns, raw_result['params'], raw_result['params_se'])}
result.ll_model = raw_result['ll_model']
result.ll_null = raw_result['ll_null']
return result
2023-04-16 21:56:09 +10:00
class Logit(RegressionModel):
2023-04-16 23:52:12 +10:00
"""
Logistic regression
**Example:**
.. code-block::
df = pd.DataFrame({
'Unhealthy': [False, False, False, ...],
'Fibrinogen': [2.52, 2.46, 2.29, ...],
'GammaGlobulin': [38, 36, 36, ...]
})
yli.regress(yli.Logit, df, 'Unhealthy', 'Fibrinogen + GammaGlobulin')
.. code-block:: text
Logistic Regression Results
======================================================
Dep. Variable: Unhealthy | No. Observations: 32
Model: Logit | Df. Model: 2
Date: 2022-10-18 | Df. Residuals: 29
Time: 19:00:34 | Pseudo : 0.26
Std. Errors: Non-Robust | LL-Model: -11.47
| LL-Null: -15.44
| p (LR): 0.02*
======================================================
exp(β) (95% CI) p
-----------------------------------------------
(Intercept) 0.00 (0.00 - 0.24) 0.03*
Fibrinogen 6.80 (1.01 - 45.79) 0.049*
GammaGlobulin 1.17 (0.92 - 1.48) 0.19
-----------------------------------------------
The output summarises the results of the regression.
Note that the parameter estimates are automatically exponentiated.
For example, the odds ratio for unhealthiness per unit increase in fibrinogen is 6.80, with 95% confidence interval 1.0145.79, and is significant with *p* value 0.049.
"""
2023-04-16 21:56:09 +10:00
@property
def model_long_name(self):
return 'Logistic Regression'
2023-04-16 21:56:09 +10:00
@property
def model_short_name(self):
return 'Logit'
2023-04-16 21:56:09 +10:00
@classmethod
def fit(cls, data_dep, data_ind):
result = cls()
result.exp = True
result.cov_type = 'nonrobust'
2023-04-16 21:56:09 +10:00
# Perform regression
raw_result = sm.Logit(endog=data_dep, exog=data_ind, missing='raise').fit(disp=False)
2023-04-16 21:56:09 +10:00
result.dof_model = raw_result.df_model
result.dof_resid = raw_result.df_resid
result.ll_model = raw_result.llf
result.ll_null = raw_result.llnull
result.terms = raw_terms_from_statsmodels_result(raw_result)
result.vcov = raw_result.cov_params()
return result
class OLS(RegressionModel):
2023-04-16 23:52:12 +10:00
"""
Ordinary least squares linear regression
**Example:**
.. code-block::
df = pd.DataFrame(...)
yli.regress(yli.OLS, df, 'LNC', 'D + T1 + T2 + S + PR + NE + CT + BW + N + PT')
.. code-block:: text
Ordinary Least Squares Regression Results
=======================================================
Dep. Variable: LNC | No. Observations: 32
Model: OLS | Df. Model: 10
Date: 2023-04-16 | Df. Residuals: 21
Time: 23:34:01 | : 0.86
Std. Errors: Non-Robust | F: 13.28
| p (F): <0.001*
=======================================================
β (95% CI) p
----------------------------------------------
(Intercept) -10.63 (-22.51 - 1.24) 0.08
D 0.23 (0.05 - 0.41) 0.02*
T1 0.01 (-0.04 - 0.05) 0.82
T2 0.01 (-0.00 - 0.02) 0.24
S 0.00 (0.00 - 0.00) <0.001*
PR -0.11 (-0.28 - 0.07) 0.21
NE 0.26 (0.09 - 0.42) 0.004*
CT 0.12 (-0.03 - 0.26) 0.12
BW 0.04 (-0.18 - 0.26) 0.73
N -0.01 (-0.03 - 0.00) 0.14
PT -0.22 (-0.49 - 0.05) 0.10
----------------------------------------------
The output summarises the results of the regression.
For example, the mean difference in "LNC" per unit increase in "D" is 0.23, with 95% confidence interval 0.050.41, and is significant with *p* value 0.02.
"""
2023-04-16 21:56:09 +10:00
@property
def model_long_name(self):
return 'Ordinary Least Squares Regression'
2022-10-13 15:57:56 +11:00
2023-04-16 21:56:09 +10:00
@property
def model_short_name(self):
return 'OLS'
2023-04-16 21:56:09 +10:00
@classmethod
def fit(cls, data_dep, data_ind):
result = cls()
result.cov_type = 'nonrobust'
2023-04-16 21:56:09 +10:00
# Perform regression
raw_result = sm.OLS(endog=data_dep, exog=data_ind, missing='raise', hasconst=True).fit()
2022-11-29 14:40:50 +11:00
2023-04-16 21:56:09 +10:00
result.dof_model = raw_result.df_model
result.dof_resid = raw_result.df_resid
result.rsquared = raw_result.rsquared
result.ll_model = raw_result.llf
result.f_statistic = raw_result.fvalue
2022-11-29 14:40:50 +11:00
2023-04-16 21:56:09 +10:00
result.terms = raw_terms_from_statsmodels_result(raw_result)
result.vcov = raw_result.cov_params()
2022-11-29 14:40:50 +11:00
2022-10-13 17:23:29 +11:00
return result
2023-04-16 21:56:09 +10:00
class OrdinalLogit(RegressionModel):
2022-10-17 21:41:19 +11:00
"""
2023-04-16 21:56:09 +10:00
Ordinal logistic (or probit) regression
2022-10-17 21:41:19 +11:00
2023-04-16 21:56:09 +10:00
The implementation calls statsmodels' native *OrderedModel*, but substitutes an alternative parameterisation used by R and Stata.
In the native statsmodels implementation, the first cutoff parameter is the true cutoff, but further cutoff parameter are log differences between consecutive cutoffs.
In this parameterisation, cutoff terms are represented directly in the model.
2023-04-16 21:56:09 +10:00
**Example:**
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
.. code-block::
df = pd.DataFrame(...)
yli.regress(yli.OrdinalLogit, df, 'apply', 'pared + public + gpa', exp=False)
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
.. code-block:: text
2023-04-16 23:52:12 +10:00
Ordinal Logistic Regression Results
==========================================================
Dep. Variable: apply | No. Observations: 400
Model: Ordinal Logit | Df. Model: 5
Date: 2022-12-02 | Df. Residuals: 395
Time: 21:30:38 | Pseudo : 0.03
Std. Errors: Non-Robust | LL-Model: -358.51
| LL-Null: -370.60
| p (LR): <0.001*
============================================================
2023-04-16 21:56:09 +10:00
β (95% CI) p
------------------------------------------------------------
pared 1.05 (0.53 - 1.57) <0.001*
public -0.06 (-0.64 - 0.53) 0.84
gpa 0.62 (0.10 - 1.13) 0.02*
(Cutoffs)
unlikely/somewhat likely 2.20 (0.68 - 3.73) 0.005*
somewhat likely/very likely 4.30 (2.72 - 5.88) <0.001*
------------------------------------------------------------
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
The output summarises the result of the regression.
The parameters shown under "(Cutoffs)" are the cutoff values in the latent variable parameterisation of ordinal regression.
Note that because `exp=False` was passed, the parameter estimates are not automatically exponentiated.
2023-03-05 02:11:12 +11:00
"""
2023-04-16 21:56:09 +10:00
@property
def model_long_name(self):
return 'Ordinal Logistic Regression'
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
@property
def model_short_name(self):
return 'Ordinal Logit'
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
@classmethod
def fit(cls, data_dep, data_ind):
result = cls()
result.cov_type = 'nonrobust'
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
# Drop explicit intercept term
if 'Intercept' in data_ind:
del data_ind['Intercept']
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
# Perform regression
raw_result = _SMOrdinalLogit(endog=data_dep, exog=data_ind, missing='raise').fit(disp=False)
result.dof_model = raw_result.df_model
result.dof_resid = raw_result.df_resid
result.ll_model = raw_result.llf
result.ll_null = raw_result.llnull
raw_terms = raw_terms_from_statsmodels_result(raw_result)
result.terms = {}
# Group ordinal regression cutoffs
for raw_name, raw_term in raw_terms.items():
if '/' in raw_name:
if '(Cutoffs)' not in result.terms:
result.terms['(Cutoffs)'] = CategoricalTerm({}, None)
result.terms['(Cutoffs)'].categories[raw_name] = raw_term
else:
result.terms[raw_name] = raw_term
return result
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
def brant(self):
2023-03-05 02:11:12 +11:00
"""
2023-04-16 21:56:09 +10:00
Perform the Brant test for the parallel regression assumption in ordinal regression
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
:rtype: :class:`BrantResult`
**Example:**
.. code-block::
df = pd.DataFrame(...)
model = yli.regress(yli.OrdinalLogit, df, 'apply', 'pared + public + gpa', exp=False)
model.brant()
.. code-block:: text
χ² df p
Omnibus 4.34 3 0.23
pared 0.13 1 0.72
public 3.44 1 0.06
gpa 0.18 1 0.67
The output shows the result of the Brant test. For example, for the omnibus test of the parallel regression assumption across all independent variables, the *χ*:sup:`2` statistic is 4.34, the *χ*:sup:`2` distribution has 3 degrees of freedom, and the test is not significant, with *p* value 0.23.
**Reference:** Brant R. Assessing proportionality in the proportional odds model for ordinal logistic regression. *Biometrics*. 1990;46(4):11718. `doi:10.2307/2532457 <https://doi.org/10.2307/2532457>`_
2023-03-05 02:11:12 +11:00
"""
2023-04-16 21:56:09 +10:00
df = self.df()
if df is None:
raise Exception('Referenced DataFrame has been dropped')
dep = self.dep
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
# Precompute design matrices
dmatrices, dep_categories = df_to_dmatrices(df, dep, self.formula, 'omit')
s_dep = dmatrices[0][dmatrices[0].columns[0]] # df[dep] as series - must get this from dmatrices to account for as_numeric
dmatrix_right = dmatrices[1]
dmatrix_right.reset_index(drop=True, inplace=True) # Otherwise this confuses matrix multiplication
# Fit individual logistic regressions
logit_models = []
for upper_limit in sorted(s_dep.unique())[:-1]:
dep_dichotomous = (s_dep <= upper_limit).astype(int).reset_index(drop=True)
logit_result = sm.Logit(dep_dichotomous, dmatrix_right).fit(disp=False)
if not logit_result.mle_retvals['converged']:
raise Exception('Maximum likelihood estimation failed to converge for {} <= {}. Check raw_result.mle_retvals.'.format(dep, upper_limit))
if pd.isna(logit_result.bse).any():
raise Exception('Regression returned NaN standard errors for {} <= {}.'.format(dep, upper_limit))
logit_models.append(logit_result)
logit_betas = np.array([model._results.params for model in logit_models]).T
logit_pihat = np.array([expit(-model.fittedvalues) for model in logit_models]).T # Predicted probabilities
# vcov is the variance-covariance matrix of all individually fitted betas across all terms
# | model 1 | model 2 | model 3 | ...
# | term 1 | term 2 | term 1 | term 2 | term 1 | term 2 | ...
# model 1 | term 1 |
# | term 2 |
# model 2 | term 1 |
# | term 2 |
# ...
n_terms = len(dmatrix_right.columns) - 1 # number of beta terms (excluding intercept)
n_betas = len(logit_models) * n_terms
vcov = np.zeros((n_betas, n_betas))
# Populate the variance-covariance matrix for comparisons between individually fitted models
for j in range(0, len(logit_models) - 1):
for l in range(j + 1, len(logit_models)):
Wjj = np.diag(logit_pihat[:,j] - logit_pihat[:,j] * logit_pihat[:,j])
Wjl = np.diag(logit_pihat[:,l] - logit_pihat[:,j] * logit_pihat[:,l])
Wll = np.diag(logit_pihat[:,l] - logit_pihat[:,l] * logit_pihat[:,l])
matrix_result = np.linalg.inv(dmatrix_right.T @ Wjj @ dmatrix_right) @ dmatrix_right.T @ Wjl @ dmatrix_right @ np.linalg.inv(dmatrix_right.T @ Wll @ dmatrix_right)
j_vs_l_vcov = np.asarray(matrix_result)[1:,1:] # Asymptotic covariance for j,l
vcov[j*n_terms:(j+1)*n_terms, l*n_terms:(l+1)*n_terms] = j_vs_l_vcov
vcov[l*n_terms:(l+1)*n_terms, j*n_terms:(j+1)*n_terms] = j_vs_l_vcov
# Populate the variance-covariance matrix within each individually fitted model
for i in range(len(logit_models)):
vcov[i*n_terms:(i+1)*n_terms, i*n_terms:(i+1)*n_terms] = logit_models[i]._results.cov_params()[1:,1:]
# ------------------
# Perform Wald tests
beta_names = ['{}_{}'.format(raw_name, i) for i in range(len(logit_models)) for raw_name in dmatrix_right.columns[1:]]
wald_results = {}
# Omnibus test
constraints = [' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models))) for raw_name in dmatrix_right.columns[1:]]
constraint = ', '.join(constraints)
dof = (len(logit_models) - 1) * (len(dmatrix_right.columns) - 1) # df = (number of levels minus 2) * (number of terms excluding intercept)
wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, dof)
wald_results['Omnibus'] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue)
# Individual terms
for raw_name in dmatrix_right.columns[1:]:
constraint = ' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models)))
dof = len(logit_models) - 1 # df = (number of levels minus 2)
wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, dof)
wald_results[raw_name] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue)
return BrantResult(wald_results)
2023-03-05 02:11:12 +11:00
2023-04-16 21:56:09 +10:00
class _SMOrdinalLogit(statsmodels.miscmodels.ordinal_model.OrderedModel):
def __init__(self, endog, exog, **kwargs):
if 'distr' not in kwargs:
kwargs['distr'] = 'logit'
super().__init__(endog, exog, **kwargs)
def transform_threshold_params(self, params):
th_params = params[-(self.k_levels - 1):]
thresh = np.concatenate(([-np.inf], th_params, [np.inf]))
return thresh
def transform_reverse_threshold_params(self, params):
return params[:-1]
2022-10-13 17:23:29 +11:00
2023-04-16 21:56:09 +10:00
class PenalisedLogit(RegressionModel):
2022-10-13 17:23:29 +11:00
"""
2023-04-16 21:56:09 +10:00
Firth penalised logistic regression
2022-10-17 21:41:19 +11:00
Uses the R *logistf* library.
2022-10-13 17:23:29 +11:00
**Example:**
.. code-block::
df = pd.DataFrame({
'Pred': [1] * 20 + [0] * 220,
'Outcome': [1] * 40 + [0] * 200
})
yli.regress(yli.PenalisedLogit, df, 'Outcome', 'Pred', exp=False)
.. code-block:: text
2023-04-16 23:52:12 +10:00
Penalised Logistic Regression Results
============================================================
Dep. Variable: Outcome | No. Observations: 240
Model: Penalised Logit | Df. Model: 1
Date: 2022-10-19 | Pseudo : 0.37
Time: 07:50:40 | LL-Model: -66.43
Std. Errors: Non-Robust | LL-Null: -105.91
| p (LR): <0.001*
============================================================
β (95% CI) p
---------------------------------------------
(Intercept) -2.28 (-2.77 - -1.85) <0.001*
Pred 5.99 (3.95 - 10.85) <0.001*
---------------------------------------------
The output summarises the result of the regression.
The summary table shows that a penalised method was applied.
Note that because `exp=False` was passed, the parameter estimates are not automatically exponentiated.
2022-10-13 17:23:29 +11:00
"""
2023-04-16 21:56:09 +10:00
@property
def model_long_name(self):
return 'Penalised Logistic Regression'
@property
def model_short_name(self):
return 'Penalised Logit'
@classmethod
def fit(cls, data_dep, data_ind):
result = cls()
result.cov_type = 'nonrobust'
2022-10-13 17:23:29 +11:00
import rpy2.robjects as ro
import rpy2.robjects.packages
import rpy2.robjects.pandas2ri
2023-04-16 21:56:09 +10:00
df = data_dep.join(data_ind)
2022-10-13 17:23:29 +11:00
2023-04-16 21:56:09 +10:00
# Drop explicit intercept term
if 'Intercept' in df:
del df['Intercept']
2022-10-13 17:23:29 +11:00
# Import logistf
ro.packages.importr('logistf')
with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter):
with ro.local_context() as lc:
# Convert DataFrame to R
lc['df'] = df
# Transfer other parameters to R
2023-04-16 21:56:09 +10:00
lc['formula_'] = df.columns[0] + ' ~ ' + ' + '.join(df.columns[1:])
lc['alpha'] = config.alpha
2022-10-13 17:23:29 +11:00
# Fit the model
model = ro.r('logistf(formula_, data=df, alpha=alpha)')
2022-10-13 17:23:29 +11:00
2023-04-16 21:56:09 +10:00
result.dof_model = model['df'][0]
result.ll_model = model['loglik'][0]
result.ll_null = model['loglik'][1]
2022-10-13 17:23:29 +11:00
2023-04-16 21:56:09 +10:00
result.terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])}
return result
2022-12-02 12:53:40 +11:00
2023-04-16 21:56:09 +10:00
# ------------------
# Brant test helpers
2023-04-16 21:56:09 +10:00
class _Dummy: pass
def _wald_test(param_names, params, formula, vcov, df):
# Hack! Piggyback off statsmodels to compute a Wald test
2023-04-16 21:56:09 +10:00
lmr = statsmodels.base.model.LikelihoodModelResults(model=None, params=None)
lmr.model = _Dummy()
lmr.model.data = _Dummy()
lmr.model.data.cov_names = param_names
lmr.params = params
lmr.df_resid = df
2022-12-02 21:43:05 +11:00
2023-04-16 21:56:09 +10:00
return lmr.wald_test(formula, cov_p=vcov, use_f=False, scalar=True)
class BrantResult:
"""
Result of a Brant test for ordinal regression
2022-12-02 21:43:05 +11:00
2023-04-16 21:56:09 +10:00
See :meth:`OrdinalLogit.brant`.
"""
2022-12-02 21:43:05 +11:00
2023-04-16 21:56:09 +10:00
def __init__(self, tests):
#: Results for Brant test on each coefficient (*Dict[str, ChiSquaredResult]*)
self.tests = tests
2022-12-02 21:43:05 +11:00
2023-04-16 21:56:09 +10:00
def __repr__(self):
if config.repr_is_summary:
return self.summary()
return super().__repr__()
2023-04-16 21:56:09 +10:00
def _repr_html_(self):
out = '<table><caption>Brant Test Results</caption><thead><tr><th></th><th style="text-align:center"><i>χ</i><sup>2</sup></th><th style="text-align:center">df</th><th style="text-align:center"><i>p</i></th></thead><tbody>'
2023-04-16 21:56:09 +10:00
for raw_name, test in self.tests.items():
out += '<tr><th>{}</th><td>{:.2f}</td><td>{:.0f}</td><td style="text-align:left">{}</td></tr>'.format(raw_name, test.statistic, test.dof, fmt_p(test.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
out += '</tbody></table>'
return out
2023-04-16 21:56:09 +10:00
def summary(self):
"""
Return a stringified summary of the *χ*:sup:`2` test
:rtype: str
"""
table = pd.DataFrame([
['{:.2f}'.format(test.statistic), '{:.0f}'.format(test.dof), fmt_p(test.pvalue, PValueStyle.TABULAR)]
for test in self.tests.values()
], index=self.tests.keys(), columns=['χ² ', 'df', 'p '])
return str(table)