1500 lines
50 KiB
Python
1500 lines
50 KiB
Python
# scipy-yli: Helpful SciPy utilities and recipes
|
|
# Copyright © 2022–2023 Lee Yingtong Li (RunasSudo)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import patsy
|
|
from scipy import stats
|
|
from scipy.special import expit
|
|
import statsmodels, statsmodels.base.model, statsmodels.miscmodels.ordinal_model
|
|
import statsmodels.api as sm
|
|
from statsmodels.iolib.table import SimpleTable
|
|
from statsmodels.stats.outliers_influence import variance_inflation_factor
|
|
|
|
from datetime import datetime
|
|
import io
|
|
import itertools
|
|
import json
|
|
import subprocess
|
|
import weakref
|
|
|
|
from .bayes_factors import BayesFactor, bayesfactor_afbf
|
|
from .config import config
|
|
from .shap import ShapResult
|
|
from .sig_tests import ChiSquaredResult, FTestResult
|
|
from .utils import Estimate, PValueStyle, as_numeric, check_nan, cols_for_formula, convert_pandas_nullable, fmt_p, formula_factor_ref_category, parse_patsy_term
|
|
|
|
# TODO: Documentation
|
|
# TODO: Bootstrap
|
|
|
|
def vif(df, formula=None, *, nan_policy='warn'):
|
|
"""
|
|
Calculate the variance inflation factor (VIF) for each variable in *df*
|
|
|
|
:param df: Data to calculate the VIF for
|
|
:type df: DataFrame
|
|
:param formula: If specified, calculate the VIF only for the variables in the formula
|
|
:type formula: str
|
|
|
|
:return: The variance inflation factors
|
|
:rtype: Series
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame({
|
|
'D': [68.58, 67.33, 67.33, ...],
|
|
'T1': [14, 10, 10, ...],
|
|
'T2': [46, 73, 85, ...],
|
|
...
|
|
})
|
|
yli.vif(df[['D', 'T1', 'T2', ...]])
|
|
|
|
.. code-block:: text
|
|
|
|
D 8.318301
|
|
T1 6.081590
|
|
T2 2.457122
|
|
...
|
|
dtype: float64
|
|
|
|
The output shows the variance inflation factor for each variable in *df*.
|
|
"""
|
|
|
|
if formula:
|
|
# Only consider columns in the formula
|
|
df = df[cols_for_formula(formula, df)]
|
|
|
|
# Check for/clean NaNs
|
|
df = check_nan(df, nan_policy)
|
|
|
|
# Convert all to float64 otherwise statsmodels chokes with "ufunc 'isfinite' not supported for the input types ..."
|
|
df = pd.get_dummies(df, drop_first=True) # Convert categorical dtypes
|
|
df = df.astype('float64') # Convert all other dtypes
|
|
|
|
# Add intercept column
|
|
orig_columns = list(df.columns)
|
|
df['Intercept'] = [1] * len(df)
|
|
|
|
vifs = {}
|
|
|
|
for i, col in enumerate(orig_columns):
|
|
vifs[col] = variance_inflation_factor(df, i)
|
|
|
|
return pd.Series(vifs)
|
|
|
|
def regress(
|
|
model_class, df, dep, formula,
|
|
*,
|
|
nan_policy='warn',
|
|
exposure=None, family=None, status=None,
|
|
method=None, maxiter=None, start_params=None, tolerance=None,
|
|
reduced=None,
|
|
bool_baselevels=False, exp=None
|
|
):
|
|
"""
|
|
Fit a statsmodels regression model
|
|
|
|
:param model_class: Type of regression model to fit
|
|
:type model_class: :class:`yli.regress.RegressionModel` subclass
|
|
:param df: Data to perform regression on
|
|
:type df: DataFrame
|
|
:param dep: Column(s) in *df* for the dependent variable (numeric)
|
|
:type dep: str
|
|
:param formula: Patsy formula for the regression model
|
|
:type formula: str
|
|
:param nan_policy: How to handle *nan* values (see :ref:`nan-handling`)
|
|
:type nan_policy: str
|
|
:param exposure: Column in *df* for the exposure variable (numeric, some models only)
|
|
:type exposure: str
|
|
:param family: See :class:`yli.GLM`
|
|
:type family: str
|
|
:param status: See :class:`yli.Cox`
|
|
:type status: str
|
|
:param method: See statsmodels *model.fit*
|
|
:param maxiter: See statsmodels *model.fit*
|
|
:param start_params: See statsmodels *model.fit*
|
|
:param tolerance: See statsmodels *model.fit*
|
|
:param reduced: See :meth:`yli.IntervalCensoredCox`
|
|
:param bool_baselevels: Show reference categories for boolean independent variables even if reference category is *False*
|
|
:type bool_baselevels: bool
|
|
:param exp: Report exponentiated parameters rather than raw parameters, default (*None*) is to autodetect based on *model_class*
|
|
:type exp: bool
|
|
|
|
:rtype: :class:`yli.regress.RegressionModel`
|
|
|
|
**Example:** See :class:`yli.OLS`, :class:`yli.Logit`, etc.
|
|
"""
|
|
|
|
if not any(x.__name__ == 'RegressionModel' for x in model_class.__bases__):
|
|
raise ValueError('model_class must be a RegressionModel')
|
|
|
|
# Additional columns to check for NaN - exposure, status, etc.
|
|
additional_columns = []
|
|
|
|
# Build function call arguments
|
|
fit_kwargs = {}
|
|
if exposure is not None:
|
|
additional_columns.append(exposure)
|
|
if family is not None:
|
|
fit_kwargs['family'] = family
|
|
if status is not None:
|
|
additional_columns.append(status)
|
|
if method is not None:
|
|
fit_kwargs['method'] = method
|
|
if maxiter is not None:
|
|
fit_kwargs['maxiter'] = maxiter
|
|
if start_params is not None:
|
|
fit_kwargs['start_params'] = start_params
|
|
if tolerance is not None:
|
|
fit_kwargs['tolerance'] = tolerance
|
|
if reduced is not None:
|
|
fit_kwargs['reduced'] = reduced
|
|
|
|
# Bodge for TimeVaryingCox
|
|
if model_class.__name__ == 'TimeVaryingCox':
|
|
additional_columns.append('index')
|
|
additional_columns.append('start')
|
|
additional_columns.append('stop')
|
|
|
|
# Preprocess data, check for NaN and get design matrices
|
|
df_ref = weakref.ref(df)
|
|
df_clean, dmatrices, dep_categories = df_to_dmatrices(df, dep, formula, nan_policy, additional_columns)
|
|
|
|
# Add function call arguments for supplemental columns - exposure, status, etc.
|
|
if exposure is not None:
|
|
fit_kwargs['exposure'] = df_clean[exposure]
|
|
if status is not None:
|
|
fit_kwargs['status'] = df_clean[status]
|
|
|
|
# Bodge for TimeVaryingCox
|
|
if model_class.__name__ == 'TimeVaryingCox':
|
|
dmatrices = (dmatrices[0], dmatrices[1].join(df_clean[['index', 'start', 'stop']]))
|
|
|
|
# Fit model
|
|
result = model_class.fit(dmatrices[0], dmatrices[1], **fit_kwargs)
|
|
|
|
# Fill in general information
|
|
result.df = df_ref
|
|
result.dep = dep
|
|
result.formula = formula
|
|
result.nan_policy = nan_policy
|
|
if exp is not None:
|
|
result.exp = exp
|
|
result.fitted_dt = datetime.now()
|
|
result.nobs = len(dmatrices[0])
|
|
|
|
# Process categorical terms, etc.
|
|
raw_terms = result.terms
|
|
result.terms = {}
|
|
|
|
for raw_name, raw_term in raw_terms.items():
|
|
# Rename terms
|
|
if raw_name == '(Intercept)':
|
|
# Already processed
|
|
result.terms[raw_name] = raw_term
|
|
elif raw_name == '(Cutoffs)':
|
|
result.terms[raw_name] = raw_term
|
|
|
|
if dep_categories is not None:
|
|
# Need to convert factorised names back into original names
|
|
old_cutoffs = raw_term.categories
|
|
raw_term.categories = {}
|
|
|
|
for cutoff_raw_name, cutoff_term in old_cutoffs.items():
|
|
bits = cutoff_raw_name.split('/')
|
|
term = dep_categories[round(float(bits[0]))] + '/' + dep_categories[round(float(bits[1]))]
|
|
result.terms[raw_name].categories[term] = cutoff_term
|
|
else:
|
|
# Parse if required
|
|
factor, column, contrast = parse_patsy_term(formula, df_clean, raw_name)
|
|
|
|
if contrast is not None:
|
|
# Categorical term
|
|
|
|
if bool_baselevels is False and contrast == 'True' and set(df[column].unique()) == set([True, False]):
|
|
# Treat as single term
|
|
result.terms[column] = raw_term
|
|
else:
|
|
# Add a new categorical term if not exists
|
|
if column not in result.terms:
|
|
ref_category = formula_factor_ref_category(formula, df_clean, factor)
|
|
result.terms[column] = CategoricalTerm({}, ref_category)
|
|
|
|
result.terms[column].categories[contrast] = raw_term
|
|
else:
|
|
# Single term
|
|
result.terms[column] = raw_term
|
|
|
|
return result
|
|
|
|
def df_to_dmatrices(df, dep, formula, nan_policy, additional_columns=[]):
|
|
# Check for/clean NaNs in input columns
|
|
columns = cols_for_formula(dep, df) + cols_for_formula(formula, df)
|
|
|
|
if additional_columns:
|
|
columns += additional_columns
|
|
|
|
df = df[columns]
|
|
df = check_nan(df, nan_policy)
|
|
|
|
# Ensure numeric type for dependent variable
|
|
if '+' in dep:
|
|
dep_categories = None
|
|
else:
|
|
df[dep], dep_categories = as_numeric(df[dep])
|
|
|
|
# Convert pandas nullable types for independent variables as this breaks statsmodels
|
|
df = convert_pandas_nullable(df)
|
|
|
|
# Construct design matrix from formula
|
|
dmatrices = patsy.dmatrices(dep + ' ~ ' + formula, df, return_type='dataframe')
|
|
|
|
return df, dmatrices, dep_categories
|
|
|
|
class RegressionModel:
|
|
# TODO: Documentation
|
|
|
|
def __init__(self):
|
|
# Model configuration (all set in yli.regress)
|
|
self.df = None
|
|
self.dep = None
|
|
self.formula = None
|
|
self.nan_policy = None
|
|
self.exp = False
|
|
self.cov_type = None
|
|
|
|
# Summary information
|
|
self.fitted_dt = None # Set in yli.regress
|
|
self.nobs = None # Set in yli.regress
|
|
self.nevents = None
|
|
self.dof_model = None
|
|
self.dof_resid = None
|
|
self.rsquared = None
|
|
self.ll_model = None
|
|
self.ll_null = None
|
|
self.ll_saturated = None
|
|
self.f_statistic = None
|
|
|
|
# Parameters
|
|
self.terms = None
|
|
self.vcov = None
|
|
|
|
# Comments
|
|
self.comments = []
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return None
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return None
|
|
|
|
def _header_table(self, html):
|
|
"""Return the entries for the header table"""
|
|
|
|
# Left column
|
|
left_col = []
|
|
|
|
left_col.append(('Dep. Variable:', self.dep))
|
|
left_col.append(('Model:', self.model_short_name))
|
|
left_col.append(('Date:', self.fitted_dt.strftime('%Y-%m-%d')))
|
|
left_col.append(('Time:', self.fitted_dt.strftime('%H:%M:%S')))
|
|
if self.cov_type:
|
|
left_col.append(('Std. Errors:', 'Non-Robust' if self.cov_type == 'nonrobust' else self.cov_type.upper() if self.cov_type.startswith('hc') else self.cov_type))
|
|
|
|
# Right column
|
|
right_col = []
|
|
|
|
right_col.append(('No. Observations:', format(self.nobs, '.0f')))
|
|
if self.nevents:
|
|
right_col.append(('No. Events:', format(self.nevents, '.0f')))
|
|
if self.dof_model:
|
|
right_col.append(('Df. Model:', format(self.dof_model, '.0f')))
|
|
if self.dof_resid:
|
|
right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f')))
|
|
if self.rsquared:
|
|
right_col.append(('<i>R</i><sup>2</sup>:' if html else 'R²:', format(self.rsquared, '.2f')))
|
|
elif self.ll_null:
|
|
right_col.append(('Pseudo <i>R</i><sup>2</sup>:' if html else 'Pseudo R²:', format(self.pseudo_rsquared, '.2f')))
|
|
if self.f_statistic:
|
|
# Report the F test if available
|
|
f_result = self.ftest()
|
|
|
|
if html:
|
|
right_col.append(('<i>F</i>:', format(f_result.statistic, '.2f')))
|
|
right_col.append(('<i>p</i> (<i>F</i>):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML)))
|
|
else:
|
|
right_col.append(('F:', format(f_result.statistic, '.2f')))
|
|
right_col.append(('p (F):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY)))
|
|
else:
|
|
# Otherwise report likelihood ratio test as overall test
|
|
right_col.append(('LL-Model:', format(self.ll_model, '.2f')))
|
|
|
|
if self.ll_null:
|
|
lrtest_result = self.lrtest_null()
|
|
|
|
right_col.append(('LL-Null:', format(self.ll_null, '.2f')))
|
|
if html:
|
|
right_col.append(('<i>p</i> (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML)))
|
|
else:
|
|
right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY)))
|
|
|
|
return left_col, right_col
|
|
|
|
def __repr__(self):
|
|
if config.repr_is_summary:
|
|
return self.summary()
|
|
return super().__repr__()
|
|
|
|
def _repr_html_(self):
|
|
# Render header table
|
|
left_col, right_col = self._header_table(html=True)
|
|
|
|
out = '<table><caption>{} Results</caption>'.format(self.model_long_name)
|
|
for left_cell, right_cell in itertools.zip_longest(left_col, right_col):
|
|
out += '<tr><th>{}</th><td>{}</td><th>{}</th><td>{}</td></tr>'.format(
|
|
left_cell[0] if left_cell else '',
|
|
left_cell[1] if left_cell else '',
|
|
right_cell[0] if right_cell else '',
|
|
right_cell[1] if right_cell else ''
|
|
)
|
|
out += '</table>'
|
|
|
|
# Render coefficients table
|
|
out += '<table><tr><th></th><th style="text-align:center">{}</th><th colspan="3" style="text-align:center">({:g}% CI)</th><th style="text-align:center"><i>p</i></th></tr>'.format('exp(<i>β</i>)' if self.exp else '<i>β</i>', (1-config.alpha)*100)
|
|
|
|
for term_name, term in self.terms.items():
|
|
if isinstance(term, SingleTerm):
|
|
# Single term
|
|
|
|
# Exponentiate beta if requested
|
|
beta = term.beta
|
|
if self.exp:
|
|
beta = np.exp(beta)
|
|
|
|
out += '<tr><th>{}</th><td>{:.2f}</td><td style="padding-right:0">({:.2f}</td><td>–</td><td style="padding-left:0">{:.2f})</td><td style="text-align:left">{}</td></tr>'.format(term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
|
|
elif isinstance(term, CategoricalTerm):
|
|
# Categorical term
|
|
out += '<tr><th>{}</th><td></td><td style="padding-right:0"></td><td></td><td style="padding-left:0"></td><td></td></tr>'.format(term_name)
|
|
|
|
# Render reference category
|
|
if term.ref_category is not None:
|
|
out += '<tr><td style="text-align:right;font-style:italic">{}</td><td>Ref.</td><td style="padding-right:0"></td><td></td><td style="padding-left:0"></td><td></td></tr>'.format(term.ref_category)
|
|
|
|
# Loop over terms
|
|
for sub_term_name, sub_term in term.categories.items():
|
|
# Exponentiate beta if requested
|
|
beta = sub_term.beta
|
|
if self.exp:
|
|
beta = np.exp(beta)
|
|
|
|
out += '<tr><td style="text-align:right;font-style:italic">{}</td><td>{:.2f}</td><td style="padding-right:0">({:.2f}</td><td>–</td><td style="padding-left:0">{:.2f})</td><td style="text-align:left">{}</td></tr>'.format(sub_term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(sub_term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
|
|
else:
|
|
raise Exception('Attempt to render unknown term type')
|
|
|
|
out += '</table>'
|
|
|
|
# TODO: Have a detailed view which shows SE, t/z, etc.
|
|
|
|
if self.comments:
|
|
out += '<ol>'
|
|
for comment in self.comments:
|
|
out += '<li>{}</li>'.format(comment)
|
|
out += '</ol>'
|
|
|
|
return out
|
|
|
|
def summary(self):
|
|
"""
|
|
Return a stringified summary of the regression model
|
|
|
|
:rtype: str
|
|
"""
|
|
|
|
# Render header table
|
|
left_col, right_col = self._header_table(html=False)
|
|
|
|
# Ensure equal sizes for SimpleTable
|
|
if len(right_col) > len(left_col):
|
|
left_col.extend([('', '')] * (len(right_col) - len(left_col)))
|
|
elif len(left_col) > len(right_col):
|
|
right_col.extend([('', '')] * (len(left_col) - len(right_col)))
|
|
|
|
table1 = SimpleTable(np.concatenate([left_col, right_col], axis=1), title='{} Results'.format(self.model_long_name))
|
|
table1.insert_stubs(2, [' | '] * len(left_col))
|
|
|
|
# Get rid of last line (merge with next table)
|
|
table1_lines = table1.as_text().splitlines(keepends=False)
|
|
out = '\n'.join(table1_lines[:-1]) + '\n'
|
|
|
|
# Render coefficients table
|
|
table_data = []
|
|
|
|
for term_name, term in self.terms.items():
|
|
if isinstance(term, SingleTerm):
|
|
# Single term
|
|
|
|
# Exponentiate beta if requested
|
|
beta = term.beta
|
|
if self.exp:
|
|
beta = np.exp(beta)
|
|
|
|
# Add some extra padding
|
|
table_data.append([term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(term.pvalue, PValueStyle.TABULAR)])
|
|
elif isinstance(term, CategoricalTerm):
|
|
# Categorical term
|
|
table_data.append([term_name + ' ', '', '', '', '', ''])
|
|
|
|
# Render reference category
|
|
if term.ref_category is not None:
|
|
table_data.append(['{} '.format(term.ref_category), 'Ref.', '', '', '', ''])
|
|
|
|
# Loop over terms
|
|
for sub_term_name, sub_term in term.categories.items():
|
|
# Exponentiate beta if requested
|
|
beta = sub_term.beta
|
|
if self.exp:
|
|
beta = np.exp(beta)
|
|
|
|
table_data.append([sub_term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(sub_term.pvalue, PValueStyle.TABULAR)])
|
|
else:
|
|
raise Exception('Attempt to render unknown term type')
|
|
|
|
table2 = SimpleTable(data=table_data, headers=['', 'exp(β)' if self.exp else 'β', '', '\ue000', '', ' p']) # U+E000 is in Private Use Area, mark middle of CI column
|
|
table2_text = table2.as_text().replace(' \ue000 ', '({:g}% CI)'.format((1-config.alpha)*100)) # Render heading in the right spot
|
|
table2_lines = table2_text.splitlines(keepends=False)
|
|
|
|
# Render divider line between 2 tables
|
|
max_table_len = max(len(table1_lines[-1]), len(table2_lines[-1]))
|
|
out += '=' * max_table_len + '\n'
|
|
|
|
out += '\n'.join(table2_lines[1:])
|
|
|
|
if self.comments:
|
|
out += '\n'
|
|
for i, comment in enumerate(self.comments):
|
|
out += '\n{}. {}'.format(i + 1, comment)
|
|
|
|
return out
|
|
|
|
def terms_flat(self):
|
|
"""
|
|
Iterate over each :class:`SingleTerm` in *self.terms*, recursively stepping through :class:`CategoricalTerm`\ s
|
|
"""
|
|
|
|
for t in self.terms.values():
|
|
if isinstance(t, CategoricalTerm):
|
|
for t in t.categories.values():
|
|
yield t
|
|
else:
|
|
yield t
|
|
|
|
# --------------------
|
|
# Post hoc tests, etc.
|
|
|
|
def bayesfactor_beta_zero(self, term):
|
|
"""
|
|
Compute a Bayes factor testing the hypothesis that the given beta is 0
|
|
|
|
Uses the R *BFpack* library.
|
|
|
|
Requires the regression to be from statsmodels.
|
|
The term must be specified as the *raw name* from the statsmodels regression, available via :attr:`SingleTerm.raw_name`.
|
|
|
|
:param term: Raw name of the term to be tested
|
|
:type term: str
|
|
|
|
:rtype: :class:`yli.bayes_factors.BayesFactor`
|
|
"""
|
|
|
|
# FIXME: Allow specifying our renamed terms
|
|
|
|
# Get parameters required for AFBF
|
|
raw_params = {}
|
|
for t in self.terms_flat():
|
|
raw_params[t.raw_name.replace('[', '_').replace(']', '_')] = t.beta.point
|
|
|
|
# Compute BF matrix
|
|
bf01 = bayesfactor_afbf(pd.Series(raw_params), self.vcov, self.nobs, '{} = 0'.format(term.replace('[', '_').replace(']', '_')))
|
|
bf01 = BayesFactor(bf01.factor, '0', '{} = 0'.format(term), '1', '{} ≠ 0'.format(term))
|
|
|
|
if bf01.factor >= 1:
|
|
return bf01
|
|
else:
|
|
return bf01.invert()
|
|
|
|
def deviance_chi2(self):
|
|
"""
|
|
Perform the deviance *χ*:sup:`2` test for goodness of fit
|
|
|
|
:rtype: :class:`yli.sig_tests.ChiSquaredResult`
|
|
"""
|
|
|
|
deviance_model = 2 * (self.ll_saturated - self.ll_model)
|
|
pvalue = 1 - stats.chi2.cdf(deviance_model, df=self.dof_resid)
|
|
|
|
return ChiSquaredResult(deviance_model, int(self.dof_resid), pvalue)
|
|
|
|
def ftest(self):
|
|
"""
|
|
Perform the *F* test that all slopes are 0
|
|
|
|
:rtype: :class:`yli.sig_tests.FTestResult`
|
|
"""
|
|
|
|
pvalue = 1 - stats.f(self.dof_model, self.dof_resid).cdf(self.f_statistic)
|
|
return FTestResult(self.f_statistic, self.dof_model, self.dof_resid, pvalue)
|
|
|
|
def lrtest_null(self):
|
|
"""
|
|
Compute the likelihood ratio test comparing the model to the null model
|
|
|
|
:rtype: :class:`LikelihoodRatioTestResult`
|
|
"""
|
|
|
|
statistic = -2 * (self.ll_null - self.ll_model)
|
|
pvalue = 1 - stats.chi2.cdf(statistic, self.dof_model)
|
|
LikelihoodRatioTestResult
|
|
return LikelihoodRatioTestResult(statistic, self.dof_model, pvalue)
|
|
|
|
@property
|
|
def pseudo_rsquared(self):
|
|
"""McFadden's pseudo *R*:sup:`2` statistic"""
|
|
|
|
return 1 - self.ll_model/self.ll_null
|
|
|
|
@property
|
|
def pseudo_rsquared_adj(self):
|
|
"""McFadden's pseudo *R*:sup:`2` statistic, adjusted for number of parameters"""
|
|
|
|
return 1 - (self.ll_model - self.dof_model) / self.ll_null
|
|
|
|
def shap(self, **kwargs):
|
|
"""
|
|
Compute SHAP values for the model
|
|
|
|
Uses the Python *shap* library.
|
|
|
|
:param kwargs: Keyword arguments to pass to *shap.LinearExplainer*
|
|
|
|
:rtype: :class:`yli.shap.ShapResult`
|
|
|
|
**Reference:** Lundberg SM, Lee SI. A unified approach to interpreting model predictions. In: Guyon I, Von Luxburg U, Bengio S, et al., editors. *Advances in Neural Information Processing Systems*; 2017 Dec 4–9; Long Beach, CA. https://proceedings.neurips.cc/paper/2017/hash/8a20a8621978632d76c43dfd28b67767-Abstract.html
|
|
"""
|
|
|
|
import shap
|
|
|
|
xdata = ShapResult._get_xdata(self)
|
|
|
|
# Combine terms into single list
|
|
params = []
|
|
for term in self.terms.values():
|
|
if isinstance(term, SingleTerm):
|
|
params.append(term.beta.point)
|
|
else:
|
|
params.extend(s.beta.point for s in term.categories.values())
|
|
|
|
explainer = shap.LinearExplainer((np.array(params[1:]), params[0]), xdata, **kwargs) # FIXME: Assumes zeroth term is intercept
|
|
shap_values = explainer.shap_values(xdata).astype('float')
|
|
|
|
return ShapResult(weakref.ref(self), shap_values, list(xdata.columns))
|
|
|
|
class LikelihoodRatioTestResult(ChiSquaredResult):
|
|
"""
|
|
Result of a likelihood ratio test for regression
|
|
|
|
See :meth:`RegressionModel.lrtest_null`.
|
|
"""
|
|
|
|
def __init__(self, statistic, dof, pvalue):
|
|
super().__init__(statistic, dof, pvalue)
|
|
|
|
def _repr_html_(self):
|
|
return 'LR({}) = {:.2f}; <i>p</i> {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION | PValueStyle.HTML))
|
|
|
|
def summary(self):
|
|
"""
|
|
Return a stringified summary of the likelihood ratio test
|
|
|
|
:rtype: str
|
|
"""
|
|
|
|
return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION))
|
|
|
|
class SingleTerm:
|
|
"""A term in a :class:`RegressionModel` which is a single term"""
|
|
|
|
def __init__(self, raw_name, beta, pvalue):
|
|
#: Raw name of the term (*str*)
|
|
self.raw_name = raw_name
|
|
#: :class:`yli.utils.Estimate` of the coefficient
|
|
self.beta = beta
|
|
#: *p* value for the coefficient (*float*)
|
|
self.pvalue = pvalue
|
|
|
|
class CategoricalTerm:
|
|
"""A group of terms in a :class:`RegressionModel` corresponding to a categorical variable"""
|
|
|
|
def __init__(self, categories, ref_category):
|
|
#: Terms for each of the categories, excluding the reference category (*dict* of :class:`SingleTerm`)
|
|
self.categories = categories
|
|
#: Name of the reference category (*str*)
|
|
self.ref_category = ref_category
|
|
|
|
def raw_terms_from_statsmodels_result(raw_result, *, wrapped=True):
|
|
if wrapped:
|
|
zipped_iter = zip(raw_result.model.exog_names, raw_result.params.values, raw_result.conf_int(config.alpha).values, raw_result.pvalues.values)
|
|
else:
|
|
zipped_iter = zip(raw_result.model.exog_names, raw_result.params, raw_result.conf_int(config.alpha), raw_result.pvalues)
|
|
|
|
return {
|
|
('(Intercept)' if raw_name == 'Intercept' else raw_name): SingleTerm(
|
|
raw_name=raw_name,
|
|
beta=Estimate(raw_param, raw_ci[0], raw_ci[1]),
|
|
pvalue=raw_p
|
|
)
|
|
for raw_name, raw_param, raw_ci, raw_p in zipped_iter
|
|
}
|
|
|
|
# ------------------------
|
|
# Concrete implementations
|
|
|
|
class Cox(RegressionModel):
|
|
# TODO: Documentation
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Cox Regression'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'Cox'
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind, *, status=None):
|
|
# Drop explicit intercept term
|
|
if 'Intercept' in data_ind:
|
|
del data_ind['Intercept']
|
|
|
|
result = cls()
|
|
result.exp = True
|
|
result.cov_type = 'nonrobust'
|
|
result.nevents = status.sum()
|
|
result.dof_model = len(data_ind.columns)
|
|
|
|
# Perform regression
|
|
raw_result = sm.PHReg(endog=data_dep, exog=data_ind, status=status, missing='raise').fit(disp=False)
|
|
|
|
result.ll_model = raw_result.llf
|
|
|
|
result.terms = raw_terms_from_statsmodels_result(raw_result, wrapped=False)
|
|
result.vcov = raw_result.cov_params()
|
|
|
|
return result
|
|
|
|
class GLM(RegressionModel):
|
|
# TODO: Documentation
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Generalised Linear Model'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'GLM'
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind, family, method='irls', maxiter=None, start_params=None):
|
|
result = cls()
|
|
result.exp = True
|
|
result.cov_type = 'nonrobust'
|
|
|
|
if family == 'binomial':
|
|
sm_family = sm.families.Binomial()
|
|
elif family == 'gamma':
|
|
sm_family = sm.families.Gamma()
|
|
elif family == 'gaussian':
|
|
sm_family = sm.families.Gaussian()
|
|
elif family == 'inverse_gaussian':
|
|
sm_family = sm.families.InverseGaussian()
|
|
elif family == 'negative_binomial':
|
|
sm_family = sm.families.NegativeBinomial()
|
|
elif family == 'poisson':
|
|
sm_family = sm.families.Poisson()
|
|
elif family == 'tweedie':
|
|
sm_family = sm.families.Tweedie()
|
|
else:
|
|
raise ValueError('Unknown GLM family')
|
|
|
|
# Perform regression
|
|
raw_result = sm.GLM(endog=data_dep, exog=data_ind, family=sm_family, missing='raise').fit(disp=False, method=method, start_params=start_params)
|
|
|
|
result.dof_model = raw_result.df_model
|
|
result.dof_resid = raw_result.df_resid
|
|
result.ll_model = raw_result.llf
|
|
result.ll_null = raw_result.llnull
|
|
|
|
result.terms = raw_terms_from_statsmodels_result(raw_result)
|
|
result.vcov = raw_result.cov_params()
|
|
|
|
return result
|
|
|
|
class IntervalCensoredCox(RegressionModel):
|
|
"""
|
|
Interval-censored Cox regression
|
|
|
|
Uses hpstat *intcox* command.
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame(...)
|
|
yli.regress(yli.IntervalCensoredCox, df, 'Left_Time + Right_Time', 'A + B + C + D + E + F + G + H')
|
|
|
|
.. code-block:: text
|
|
|
|
Interval-Censored Cox Regression Results
|
|
===================================================================
|
|
Dep. Variable: Left_Time + Right_Time | No. Observations: 1124
|
|
Model: Interval-Censored Cox | No. Events: 133
|
|
Date: 2023-04-17 | Df. Model: 8
|
|
Time: 22:30:40 | Pseudo R²: 0.00
|
|
Std. Errors: OPG | LL-Model: -613.21
|
|
| LL-Null: -615.81
|
|
| p (LR): 0.82
|
|
===================================================================
|
|
exp(β) (95% CI) p
|
|
----------------------------------
|
|
A 0.83 (0.37 - 1.88) 0.66
|
|
B 1.08 (0.81 - 1.46) 0.60
|
|
C 0.49 (0.24 - 1.00) 0.052
|
|
D 0.79 (0.42 - 1.50) 0.48
|
|
E 0.87 (0.40 - 1.85) 0.71
|
|
F 0.64 (0.28 - 1.45) 0.29
|
|
G 1.07 (0.44 - 2.62) 0.88
|
|
H 1.23 (0.48 - 3.20) 0.67
|
|
----------------------------------
|
|
|
|
The output summarises the result of the regression.
|
|
|
|
**Reference:** Zeng D, Mao L, Lin DY. Maximum likelihood estimation for semiparametric transformation models with interval-censored data. *Biometrika*. 2016;103(2):253–71. `doi:10.1093/biomet/asw013 <https://doi.org/10.1093/biomet/asw013>`_
|
|
"""
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Interval-Censored Cox Regression'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'Interval-Censored Cox'
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
# TODO: Documentation
|
|
self.lambda_ = None
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind, *, reduced=False, maxiter=None, tolerance=None):
|
|
if len(data_dep.columns) != 2:
|
|
raise ValueError('IntervalCensoredCox requires left and right times')
|
|
|
|
# Drop explicit intercept term
|
|
if 'Intercept' in data_ind:
|
|
del data_ind['Intercept']
|
|
|
|
result = cls()
|
|
result.exp = True
|
|
result.cov_type = 'OPG'
|
|
result.nevents = np.isfinite(data_dep.iloc[:, 1]).sum()
|
|
result.dof_model = len(data_ind.columns)
|
|
|
|
# Prepare arguments
|
|
intcox_args = [config.hpstat_path, 'intcox', '-', '--output', 'json']
|
|
if reduced:
|
|
intcox_args.append('--reduced')
|
|
if maxiter:
|
|
intcox_args.append('--max-iterations')
|
|
intcox_args.append(str(maxiter))
|
|
if tolerance:
|
|
intcox_args.append('--param-tolerance')
|
|
intcox_args.append(str(tolerance))
|
|
|
|
# Export data to CSV
|
|
csv_buf = io.StringIO()
|
|
data_dep.join(data_ind).to_csv(csv_buf, index=False)
|
|
csv_str = csv_buf.getvalue()
|
|
|
|
# Run intcens binary
|
|
proc = subprocess.run(intcox_args, input=csv_str, stdout=subprocess.PIPE, stderr=None, encoding='utf-8', check=True)
|
|
raw_result = json.loads(proc.stdout)
|
|
|
|
from IPython.display import clear_output
|
|
clear_output(wait=True)
|
|
|
|
z_critical = -stats.norm.ppf(config.alpha / 2)
|
|
result.terms = {raw_name: SingleTerm(
|
|
raw_name=raw_name,
|
|
beta=Estimate(raw_param, raw_param - z_critical * raw_se, raw_param + z_critical * raw_se),
|
|
pvalue=stats.norm.cdf(-np.abs(raw_param) / raw_se) * 2
|
|
) for raw_name, raw_param, raw_se in zip(data_ind.columns, raw_result['params'], raw_result['params_se'])}
|
|
|
|
result.cumulative_hazard = pd.Series(data=raw_result['cumulative_hazard'], index=raw_result['cumulative_hazard_times'])
|
|
|
|
result.ll_model = raw_result['ll_model']
|
|
result.ll_null = raw_result['ll_null']
|
|
|
|
return result
|
|
|
|
def survival_function(self):
|
|
# TODO: Documentation
|
|
|
|
return np.exp(-self.cumulative_hazard)
|
|
|
|
def plot_survival_function(self, ax=None):
|
|
# TODO: Documentation
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
if ax is None:
|
|
fig, ax = plt.subplots()
|
|
|
|
sf = self.survival_function()
|
|
|
|
# Draw straight lines
|
|
# np.concatenate(...) to force starting drawing from time 0, survival 100%
|
|
xpoints = np.concatenate([[0], sf.index]).repeat(2)[1:]
|
|
ypoints = np.concatenate([[1], sf]).repeat(2)[:-1]
|
|
|
|
ax.plot(xpoints, ypoints)
|
|
|
|
ax.set_xlabel('Analysis time')
|
|
ax.set_ylabel('Survival probability')
|
|
ax.set_xlim(left=0)
|
|
ax.set_ylim(0, 1)
|
|
|
|
def plot_loglog_survival(self, ax=None):
|
|
# TODO: Documentation
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
if ax is None:
|
|
fig, ax = plt.subplots()
|
|
|
|
sf = self.survival_function()
|
|
|
|
# Draw straight lines
|
|
xpoints = np.log(sf.index).to_numpy().repeat(2)[1:]
|
|
ypoints = (-np.log(-np.log(sf))).to_numpy().repeat(2)[:-1]
|
|
|
|
ax.plot(xpoints, ypoints)
|
|
|
|
ax.set_xlabel('ln(Analysis time)')
|
|
ax.set_ylabel('−ln(−ln(Survival probability))')
|
|
|
|
class Logit(RegressionModel):
|
|
"""
|
|
Logistic regression
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame({
|
|
'Unhealthy': [False, False, False, ...],
|
|
'Fibrinogen': [2.52, 2.46, 2.29, ...],
|
|
'GammaGlobulin': [38, 36, 36, ...]
|
|
})
|
|
yli.regress(yli.Logit, df, 'Unhealthy', 'Fibrinogen + GammaGlobulin')
|
|
|
|
.. code-block:: text
|
|
|
|
Logistic Regression Results
|
|
======================================================
|
|
Dep. Variable: Unhealthy | No. Observations: 32
|
|
Model: Logit | Df. Model: 2
|
|
Date: 2022-10-18 | Df. Residuals: 29
|
|
Time: 19:00:34 | Pseudo R²: 0.26
|
|
Std. Errors: Non-Robust | LL-Model: -11.47
|
|
| LL-Null: -15.44
|
|
| p (LR): 0.02*
|
|
======================================================
|
|
exp(β) (95% CI) p
|
|
-----------------------------------------------
|
|
(Intercept) 0.00 (0.00 - 0.24) 0.03*
|
|
Fibrinogen 6.80 (1.01 - 45.79) 0.049*
|
|
GammaGlobulin 1.17 (0.92 - 1.48) 0.19
|
|
-----------------------------------------------
|
|
|
|
The output summarises the results of the regression.
|
|
Note that the parameter estimates are automatically exponentiated.
|
|
For example, the adjusted odds ratio for unhealthiness per unit increase in fibrinogen is 6.80, with 95% confidence interval 1.01–45.79, and is significant with *p* value 0.049.
|
|
"""
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Logistic Regression'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'Logit'
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind):
|
|
result = cls()
|
|
result.exp = True
|
|
result.cov_type = 'nonrobust'
|
|
|
|
# Perform regression
|
|
raw_result = sm.Logit(endog=data_dep, exog=data_ind, missing='raise').fit(disp=False)
|
|
|
|
result.dof_model = raw_result.df_model
|
|
result.dof_resid = raw_result.df_resid
|
|
result.ll_model = raw_result.llf
|
|
result.ll_null = raw_result.llnull
|
|
|
|
result.terms = raw_terms_from_statsmodels_result(raw_result)
|
|
result.vcov = raw_result.cov_params()
|
|
|
|
return result
|
|
|
|
class OLS(RegressionModel):
|
|
"""
|
|
Ordinary least squares linear regression
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame(...)
|
|
yli.regress(yli.OLS, df, 'LNC', 'D + T1 + T2 + S + PR + NE + CT + BW + N + PT')
|
|
|
|
.. code-block:: text
|
|
|
|
Ordinary Least Squares Regression Results
|
|
=======================================================
|
|
Dep. Variable: LNC | No. Observations: 32
|
|
Model: OLS | Df. Model: 10
|
|
Date: 2023-04-16 | Df. Residuals: 21
|
|
Time: 23:34:01 | R²: 0.86
|
|
Std. Errors: Non-Robust | F: 13.28
|
|
| p (F): <0.001*
|
|
=======================================================
|
|
β (95% CI) p
|
|
----------------------------------------------
|
|
(Intercept) -10.63 (-22.51 - 1.24) 0.08
|
|
D 0.23 (0.05 - 0.41) 0.02*
|
|
T1 0.01 (-0.04 - 0.05) 0.82
|
|
T2 0.01 (-0.00 - 0.02) 0.24
|
|
S 0.00 (0.00 - 0.00) <0.001*
|
|
PR -0.11 (-0.28 - 0.07) 0.21
|
|
NE 0.26 (0.09 - 0.42) 0.004*
|
|
CT 0.12 (-0.03 - 0.26) 0.12
|
|
BW 0.04 (-0.18 - 0.26) 0.73
|
|
N -0.01 (-0.03 - 0.00) 0.14
|
|
PT -0.22 (-0.49 - 0.05) 0.10
|
|
----------------------------------------------
|
|
|
|
The output summarises the results of the regression.
|
|
For example, the adjusted mean difference in "LNC" per unit increase in "D" is 0.23, with 95% confidence interval 0.05–0.41, and is significant with *p* value 0.02.
|
|
"""
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Ordinary Least Squares Regression'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'OLS'
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind):
|
|
result = cls()
|
|
result.cov_type = 'nonrobust'
|
|
|
|
# Perform regression
|
|
raw_result = sm.OLS(endog=data_dep, exog=data_ind, missing='raise', hasconst=True).fit()
|
|
|
|
result.dof_model = raw_result.df_model
|
|
result.dof_resid = raw_result.df_resid
|
|
result.rsquared = raw_result.rsquared
|
|
result.ll_model = raw_result.llf
|
|
result.f_statistic = raw_result.fvalue
|
|
|
|
result.terms = raw_terms_from_statsmodels_result(raw_result)
|
|
result.vcov = raw_result.cov_params()
|
|
|
|
return result
|
|
|
|
class OrdinalLogit(RegressionModel):
|
|
"""
|
|
Ordinal logistic (or probit) regression
|
|
|
|
The implementation calls statsmodels' native *OrderedModel*, but substitutes an alternative parameterisation used by R and Stata.
|
|
In the native statsmodels implementation, the first cutoff parameter is the true cutoff, but further cutoff parameter are log differences between consecutive cutoffs.
|
|
In this parameterisation, cutoff terms are represented directly in the model.
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame(...)
|
|
yli.regress(yli.OrdinalLogit, df, 'apply', 'pared + public + gpa', exp=False)
|
|
|
|
.. code-block:: text
|
|
|
|
Ordinal Logistic Regression Results
|
|
==========================================================
|
|
Dep. Variable: apply | No. Observations: 400
|
|
Model: Ordinal Logit | Df. Model: 3
|
|
Date: 2022-12-02 | Df. Residuals: 395
|
|
Time: 21:30:38 | Pseudo R²: 0.03
|
|
Std. Errors: Non-Robust | LL-Model: -358.51
|
|
| LL-Null: -370.60
|
|
| p (LR): <0.001*
|
|
============================================================
|
|
β (95% CI) p
|
|
------------------------------------------------------------
|
|
pared 1.05 (0.53 - 1.57) <0.001*
|
|
public -0.06 (-0.64 - 0.53) 0.84
|
|
gpa 0.62 (0.10 - 1.13) 0.02*
|
|
(Cutoffs)
|
|
unlikely/somewhat likely 2.20 (0.68 - 3.73) 0.005*
|
|
somewhat likely/very likely 4.30 (2.72 - 5.88) <0.001*
|
|
------------------------------------------------------------
|
|
|
|
The output summarises the result of the regression.
|
|
The parameters shown under "(Cutoffs)" are the cutoff values in the latent variable parameterisation of ordinal regression.
|
|
Note that because `exp=False` was passed, the parameter estimates are not automatically exponentiated.
|
|
"""
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Ordinal Logistic Regression'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'Ordinal Logit'
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind):
|
|
result = cls()
|
|
result.cov_type = 'nonrobust'
|
|
|
|
# Drop explicit intercept term
|
|
if 'Intercept' in data_ind:
|
|
del data_ind['Intercept']
|
|
|
|
# Perform regression
|
|
raw_result = _SMOrdinalLogit(endog=data_dep, exog=data_ind, missing='raise').fit(disp=False)
|
|
|
|
result.dof_model = raw_result.df_model
|
|
result.dof_resid = raw_result.df_resid
|
|
result.ll_model = raw_result.llf
|
|
result.ll_null = raw_result.llnull
|
|
|
|
raw_terms = raw_terms_from_statsmodels_result(raw_result)
|
|
result.terms = {}
|
|
|
|
# Group ordinal regression cutoffs
|
|
for raw_name, raw_term in raw_terms.items():
|
|
if '/' in raw_name:
|
|
if '(Cutoffs)' not in result.terms:
|
|
result.terms['(Cutoffs)'] = CategoricalTerm({}, None)
|
|
|
|
result.terms['(Cutoffs)'].categories[raw_name] = raw_term
|
|
else:
|
|
result.terms[raw_name] = raw_term
|
|
|
|
return result
|
|
|
|
def brant(self):
|
|
"""
|
|
Perform the Brant test for the parallel regression assumption in ordinal regression
|
|
|
|
:rtype: :class:`BrantResult`
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame(...)
|
|
model = yli.regress(yli.OrdinalLogit, df, 'apply', 'pared + public + gpa', exp=False)
|
|
model.brant()
|
|
|
|
.. code-block:: text
|
|
|
|
χ² df p
|
|
Omnibus 4.34 3 0.23
|
|
pared 0.13 1 0.72
|
|
public 3.44 1 0.06
|
|
gpa 0.18 1 0.67
|
|
|
|
The output shows the result of the Brant test. For example, for the omnibus test of the parallel regression assumption across all independent variables, the *χ*:sup:`2` statistic is 4.34, the *χ*:sup:`2` distribution has 3 degrees of freedom, and the test is not significant, with *p* value 0.23.
|
|
|
|
**Reference:** Brant R. Assessing proportionality in the proportional odds model for ordinal logistic regression. *Biometrics*. 1990;46(4):1171–8. `doi:10.2307/2532457 <https://doi.org/10.2307/2532457>`_
|
|
"""
|
|
|
|
df = self.df()
|
|
if df is None:
|
|
raise Exception('Referenced DataFrame has been dropped')
|
|
dep = self.dep
|
|
|
|
# Precompute design matrices
|
|
_, dmatrices, dep_categories = df_to_dmatrices(df, dep, self.formula, 'omit')
|
|
s_dep = dmatrices[0][dmatrices[0].columns[0]] # df[dep] as series - must get this from dmatrices to account for as_numeric
|
|
dmatrix_right = dmatrices[1]
|
|
dmatrix_right.reset_index(drop=True, inplace=True) # Otherwise this confuses matrix multiplication
|
|
|
|
# Fit individual logistic regressions
|
|
logit_models = []
|
|
for upper_limit in sorted(s_dep.unique())[:-1]:
|
|
dep_dichotomous = (s_dep <= upper_limit).astype(int).reset_index(drop=True)
|
|
logit_result = sm.Logit(dep_dichotomous, dmatrix_right).fit(disp=False)
|
|
|
|
if not logit_result.mle_retvals['converged']:
|
|
raise Exception('Maximum likelihood estimation failed to converge for {} <= {}. Check raw_result.mle_retvals.'.format(dep, upper_limit))
|
|
|
|
if pd.isna(logit_result.bse).any():
|
|
raise Exception('Regression returned NaN standard errors for {} <= {}.'.format(dep, upper_limit))
|
|
|
|
logit_models.append(logit_result)
|
|
|
|
logit_betas = np.array([model._results.params for model in logit_models]).T
|
|
logit_pihat = np.array([expit(-model.fittedvalues) for model in logit_models]).T # Predicted probabilities
|
|
|
|
# vcov is the variance-covariance matrix of all individually fitted betas across all terms
|
|
|
|
# | model 1 | model 2 | model 3 | ...
|
|
# | term 1 | term 2 | term 1 | term 2 | term 1 | term 2 | ...
|
|
# model 1 | term 1 |
|
|
# | term 2 |
|
|
# model 2 | term 1 |
|
|
# | term 2 |
|
|
# ...
|
|
|
|
n_terms = len(dmatrix_right.columns) - 1 # number of beta terms (excluding intercept)
|
|
n_betas = len(logit_models) * n_terms
|
|
vcov = np.zeros((n_betas, n_betas))
|
|
|
|
# Populate the variance-covariance matrix for comparisons between individually fitted models
|
|
for j in range(0, len(logit_models) - 1):
|
|
for l in range(j + 1, len(logit_models)):
|
|
Wjj = np.diag(logit_pihat[:,j] - logit_pihat[:,j] * logit_pihat[:,j])
|
|
Wjl = np.diag(logit_pihat[:,l] - logit_pihat[:,j] * logit_pihat[:,l])
|
|
Wll = np.diag(logit_pihat[:,l] - logit_pihat[:,l] * logit_pihat[:,l])
|
|
|
|
matrix_result = np.linalg.inv(dmatrix_right.T @ Wjj @ dmatrix_right) @ dmatrix_right.T @ Wjl @ dmatrix_right @ np.linalg.inv(dmatrix_right.T @ Wll @ dmatrix_right)
|
|
j_vs_l_vcov = np.asarray(matrix_result)[1:,1:] # Asymptotic covariance for j,l
|
|
|
|
vcov[j*n_terms:(j+1)*n_terms, l*n_terms:(l+1)*n_terms] = j_vs_l_vcov
|
|
vcov[l*n_terms:(l+1)*n_terms, j*n_terms:(j+1)*n_terms] = j_vs_l_vcov
|
|
|
|
# Populate the variance-covariance matrix within each individually fitted model
|
|
for i in range(len(logit_models)):
|
|
vcov[i*n_terms:(i+1)*n_terms, i*n_terms:(i+1)*n_terms] = logit_models[i]._results.cov_params()[1:,1:]
|
|
|
|
# ------------------
|
|
# Perform Wald tests
|
|
|
|
beta_names = ['{}_{}'.format(raw_name, i) for i in range(len(logit_models)) for raw_name in dmatrix_right.columns[1:]]
|
|
wald_results = {}
|
|
|
|
# Omnibus test
|
|
constraints = [' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models))) for raw_name in dmatrix_right.columns[1:]]
|
|
constraint = ', '.join(constraints)
|
|
dof = (len(logit_models) - 1) * (len(dmatrix_right.columns) - 1) # df = (number of levels minus 2) * (number of terms excluding intercept)
|
|
wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, dof)
|
|
wald_results['Omnibus'] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue)
|
|
|
|
# Individual terms
|
|
for raw_name in dmatrix_right.columns[1:]:
|
|
constraint = ' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models)))
|
|
dof = len(logit_models) - 1 # df = (number of levels minus 2)
|
|
wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, dof)
|
|
wald_results[raw_name] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue)
|
|
|
|
return BrantResult(wald_results)
|
|
|
|
class _SMOrdinalLogit(statsmodels.miscmodels.ordinal_model.OrderedModel):
|
|
def __init__(self, endog, exog, **kwargs):
|
|
if 'distr' not in kwargs:
|
|
kwargs['distr'] = 'logit'
|
|
|
|
super().__init__(endog, exog, **kwargs)
|
|
|
|
def transform_threshold_params(self, params):
|
|
th_params = params[-(self.k_levels - 1):]
|
|
thresh = np.concatenate(([-np.inf], th_params, [np.inf]))
|
|
return thresh
|
|
|
|
def transform_reverse_threshold_params(self, params):
|
|
return params[:-1]
|
|
|
|
class PenalisedLogit(RegressionModel):
|
|
"""
|
|
Firth penalised logistic regression
|
|
|
|
Uses the R *logistf* library.
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame({
|
|
'Pred': [1] * 20 + [0] * 220,
|
|
'Outcome': [1] * 40 + [0] * 200
|
|
})
|
|
yli.regress(yli.PenalisedLogit, df, 'Outcome', 'Pred', exp=False)
|
|
|
|
.. code-block:: text
|
|
|
|
Penalised Logistic Regression Results
|
|
============================================================
|
|
Dep. Variable: Outcome | No. Observations: 240
|
|
Model: Penalised Logit | Df. Model: 1
|
|
Date: 2022-10-19 | Pseudo R²: 0.37
|
|
Time: 07:50:40 | LL-Model: -66.43
|
|
Std. Errors: Non-Robust | LL-Null: -105.91
|
|
| p (LR): <0.001*
|
|
============================================================
|
|
β (95% CI) p
|
|
---------------------------------------------
|
|
(Intercept) -2.28 (-2.77 - -1.85) <0.001*
|
|
Pred 5.99 (3.95 - 10.85) <0.001*
|
|
---------------------------------------------
|
|
|
|
The output summarises the result of the regression.
|
|
The summary table shows that a penalised method was applied.
|
|
Note that because `exp=False` was passed, the parameter estimates are not automatically exponentiated.
|
|
"""
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Penalised Logistic Regression'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'Penalised Logit'
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind):
|
|
result = cls()
|
|
result.cov_type = 'nonrobust'
|
|
|
|
import rpy2.robjects as ro
|
|
import rpy2.robjects.packages
|
|
import rpy2.robjects.pandas2ri
|
|
|
|
df = data_dep.join(data_ind)
|
|
|
|
# Drop explicit intercept term
|
|
if 'Intercept' in df:
|
|
del df['Intercept']
|
|
|
|
# Import logistf
|
|
ro.packages.importr('logistf')
|
|
|
|
with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter):
|
|
with ro.local_context() as lc:
|
|
# Convert DataFrame to R
|
|
lc['df'] = df
|
|
|
|
# Transfer other parameters to R
|
|
lc['formula_'] = df.columns[0] + ' ~ ' + ' + '.join(df.columns[1:])
|
|
lc['alpha'] = config.alpha
|
|
|
|
# Fit the model
|
|
model = ro.r('logistf(formula_, data=df, alpha=alpha)')
|
|
|
|
result.dof_model = model['df'][0]
|
|
result.ll_model = model['loglik'][0]
|
|
result.ll_null = model['loglik'][1]
|
|
|
|
result.terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])}
|
|
|
|
return result
|
|
|
|
class Poisson(RegressionModel):
|
|
"""
|
|
Poisson regression
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame(...)
|
|
yli.regress(yli.Poisson, df, 'num_awards', 'C(prog) + math')
|
|
|
|
.. code-block:: text
|
|
|
|
Poisson Regression Results
|
|
=======================================================
|
|
Dep. Variable: num_awards | No. Observations: 200
|
|
Model: Poisson | Df. Model: 3
|
|
Date: 2023-04-22 | Df. Residuals: 196
|
|
Time: 16:58:21 | Pseudo R²: 0.21
|
|
Std. Errors: Non-Robust | LL-Model: -182.75
|
|
| LL-Null: -231.86
|
|
| p (LR): <0.001*
|
|
=======================================================
|
|
exp(β) (95% CI) p
|
|
--------------------------------------------
|
|
(Intercept) 0.01 (0.00 - 0.02) <0.001*
|
|
prog
|
|
1 Ref.
|
|
2 2.96 (1.46 - 5.97) 0.002*
|
|
3 1.45 (0.61 - 3.44) 0.40
|
|
math 1.07 (1.05 - 1.10) <0.001*
|
|
--------------------------------------------
|
|
|
|
The output summarises the results of the regression.
|
|
For example, the adjusted incidence rate ratio in "num_awards" per unit increase in "math" is 1.07, with 95% confidence interval 1.05–1.10, and is significant with *p* value < 0.001.
|
|
"""
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Poisson Regression'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'Poisson'
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind, exposure=None, method='newton', maxiter=None, start_params=None):
|
|
result = cls()
|
|
result.exp = True
|
|
result.cov_type = 'nonrobust'
|
|
|
|
# Perform regression
|
|
raw_result = sm.Poisson(endog=data_dep, exog=data_ind, exposure=exposure, missing='raise').fit(disp=False, method=method, start_params=start_params)
|
|
|
|
result.dof_model = raw_result.df_model
|
|
result.dof_resid = raw_result.df_resid
|
|
result.ll_model = raw_result.llf
|
|
result.ll_null = raw_result.llnull
|
|
|
|
# Compute saturated log-likelihood
|
|
result.ll_saturated = float(sm.families.Poisson().loglike(data_dep + 1e-10, data_dep + 1e-10))
|
|
|
|
result.terms = raw_terms_from_statsmodels_result(raw_result)
|
|
result.vcov = raw_result.cov_params()
|
|
|
|
return result
|
|
|
|
class TimeVaryingCox(RegressionModel):
|
|
# TODO: Documentation
|
|
# Requires df to be in lifelines long format
|
|
|
|
@property
|
|
def model_long_name(self):
|
|
return 'Time-Varying Cox Regression'
|
|
|
|
@property
|
|
def model_short_name(self):
|
|
return 'Time-Varying Cox'
|
|
|
|
@classmethod
|
|
def fit(cls, data_dep, data_ind, exposure=None, method='newton', maxiter=None, start_params=None):
|
|
result = cls()
|
|
result.exp = True
|
|
result.cov_type = 'nonrobust'
|
|
|
|
import lifelines
|
|
|
|
# Perform regression
|
|
ctv = lifelines.CoxTimeVaryingFitter(penalizer=0)
|
|
ctv.fit(data_dep.join(data_ind), id_col='index', event_col=data_dep.columns[0], start_col='start', stop_col='stop')
|
|
|
|
result.nobs = ctv._n_unique
|
|
result.nevents = ctv.event_observed.sum()
|
|
result.dof_model = len(ctv.params_)
|
|
result.ll_model = ctv.log_likelihood_
|
|
result.ll_null = ctv._log_likelihood_null
|
|
|
|
result.terms = {
|
|
raw_name: SingleTerm(
|
|
raw_name=raw_name,
|
|
beta=Estimate(raw_param, raw_ci[0], raw_ci[1]),
|
|
pvalue=raw_p
|
|
)
|
|
for raw_name, raw_param, raw_ci, raw_p in zip(ctv.params_.index, ctv.params_, ctv.confidence_intervals_.itertuples(index=False), ctv._compute_p_values())
|
|
}
|
|
|
|
result.vcov = ctv.variance_matrix_
|
|
|
|
return result
|
|
|
|
# ------------------
|
|
# Brant test helpers
|
|
|
|
class _Dummy: pass
|
|
|
|
def _wald_test(param_names, params, formula, vcov, df):
|
|
# Hack! Piggyback off statsmodels to compute a Wald test
|
|
|
|
lmr = statsmodels.base.model.LikelihoodModelResults(model=None, params=None)
|
|
lmr.model = _Dummy()
|
|
lmr.model.data = _Dummy()
|
|
lmr.model.data.cov_names = param_names
|
|
lmr.params = params
|
|
lmr.df_resid = df
|
|
|
|
return lmr.wald_test(formula, cov_p=vcov, use_f=False, scalar=True)
|
|
|
|
class BrantResult:
|
|
"""
|
|
Result of a Brant test for ordinal regression
|
|
|
|
See :meth:`OrdinalLogit.brant`.
|
|
"""
|
|
|
|
def __init__(self, tests):
|
|
#: Results for Brant test on each coefficient (*Dict[str, ChiSquaredResult]*)
|
|
self.tests = tests
|
|
|
|
def __repr__(self):
|
|
if config.repr_is_summary:
|
|
return self.summary()
|
|
return super().__repr__()
|
|
|
|
def _repr_html_(self):
|
|
out = '<table><caption>Brant Test Results</caption><thead><tr><th></th><th style="text-align:center"><i>χ</i><sup>2</sup></th><th style="text-align:center">df</th><th style="text-align:center"><i>p</i></th></thead><tbody>'
|
|
|
|
for raw_name, test in self.tests.items():
|
|
out += '<tr><th>{}</th><td>{:.2f}</td><td>{:.0f}</td><td style="text-align:left">{}</td></tr>'.format(raw_name, test.statistic, test.dof, fmt_p(test.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
|
|
|
|
out += '</tbody></table>'
|
|
return out
|
|
|
|
def summary(self):
|
|
"""
|
|
Return a stringified summary of the *χ*:sup:`2` test
|
|
|
|
:rtype: str
|
|
"""
|
|
|
|
table = pd.DataFrame([
|
|
['{:.2f}'.format(test.statistic), '{:.0f}'.format(test.dof), fmt_p(test.pvalue, PValueStyle.TABULAR)]
|
|
for test in self.tests.values()
|
|
], index=self.tests.keys(), columns=['χ² ', 'df', 'p '])
|
|
|
|
return str(table)
|