732 lines
25 KiB
Python
732 lines
25 KiB
Python
# scipy-yli: Helpful SciPy utilities and recipes
|
|
# Copyright © 2022 Lee Yingtong Li (RunasSudo)
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy import stats
|
|
import statsmodels
|
|
import statsmodels.api as sm
|
|
from statsmodels.iolib.table import SimpleTable
|
|
from statsmodels.stats.outliers_influence import variance_inflation_factor
|
|
|
|
from datetime import datetime
|
|
import itertools
|
|
import warnings
|
|
|
|
from .bayes_factors import BayesFactor, bayesfactor_afbf
|
|
from .config import config
|
|
from .sig_tests import FTestResult
|
|
from .utils import Estimate, PValueStyle, check_nan, cols_for_formula, convert_pandas_nullable, fmt_p, formula_factor_ref_category, parse_patsy_term
|
|
|
|
def vif(df, formula=None, *, nan_policy='warn'):
|
|
"""
|
|
Calculate the variance inflation factor (VIF) for each variable in *df*
|
|
|
|
:param df: Data to calculate the VIF for
|
|
:type df: DataFrame
|
|
:param formula: If specified, calculate the VIF only for the variables in the formula
|
|
:type formula: str
|
|
|
|
:return: The variance inflation factors
|
|
:rtype: Series
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame({
|
|
'D': [68.58, 67.33, 67.33, ...],
|
|
'T1': [14, 10, 10, ...],
|
|
'T2': [46, 73, 85, ...],
|
|
...
|
|
})
|
|
yli.vif(df[['D', 'T1', 'T2', ...]])
|
|
|
|
.. code-block:: text
|
|
|
|
D 8.318301
|
|
T1 6.081590
|
|
T2 2.457122
|
|
...
|
|
dtype: float64
|
|
|
|
The output shows the variance inflation factor for each variable in *df*.
|
|
"""
|
|
|
|
if formula:
|
|
# Only consider columns in the formula
|
|
df = df[cols_for_formula(formula, df)]
|
|
|
|
# Check for/clean NaNs
|
|
df = check_nan(df, nan_policy)
|
|
|
|
# Convert all to float64 otherwise statsmodels chokes with "ufunc 'isfinite' not supported for the input types ..."
|
|
df = pd.get_dummies(df, drop_first=True) # Convert categorical dtypes
|
|
df = df.astype('float64') # Convert all other dtypes
|
|
|
|
# Add intercept column
|
|
orig_columns = list(df.columns)
|
|
df['Intercept'] = [1] * len(df)
|
|
|
|
vifs = {}
|
|
|
|
for i, col in enumerate(orig_columns):
|
|
vifs[col] = variance_inflation_factor(df, i)
|
|
|
|
return pd.Series(vifs)
|
|
|
|
# ----------
|
|
# Regression
|
|
|
|
class LikelihoodRatioTestResult:
|
|
"""
|
|
Result of a likelihood ratio test for regression
|
|
|
|
See :meth:`RegressionResult.lrtest_null`.
|
|
"""
|
|
|
|
def __init__(self, statistic, dof, pvalue):
|
|
#: Likelihood ratio test statistic (*float*)
|
|
self.statistic = statistic
|
|
#: Degrees of freedom for the likelihood ratio test statistic (*int*)
|
|
self.dof = dof
|
|
#: *p* value for the likelihood ratio test (*float*)
|
|
self.pvalue = pvalue
|
|
|
|
def __repr__(self):
|
|
if config.repr_is_summary:
|
|
return self.summary()
|
|
return super().__repr__()
|
|
|
|
def _repr_html_(self):
|
|
return 'LR({}) = {:.2f}; <i>p</i> {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION | PValueStyle.HTML))
|
|
|
|
def summary(self):
|
|
"""
|
|
Return a stringified summary of the likelihood ratio test
|
|
|
|
:rtype: str
|
|
"""
|
|
|
|
return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION))
|
|
|
|
class RegressionResult:
|
|
"""
|
|
Result of a regression
|
|
|
|
See :func:`yli.regress`.
|
|
"""
|
|
|
|
def __init__(self,
|
|
raw_result,
|
|
full_name, model_name, fit_method,
|
|
dep, nobs, dof_model, fitted_dt, cov_type,
|
|
terms,
|
|
llf, llnull,
|
|
dof_resid, rsquared, f_statistic,
|
|
exp
|
|
):
|
|
#: Raw result from statsmodels *model.fit*
|
|
self.raw_result = raw_result
|
|
|
|
# Information for display
|
|
#: Full name of the regression model type (*str*)
|
|
self.full_name = full_name
|
|
#: Short name of the regression model type (*str*)
|
|
self.model_name = model_name
|
|
#: Method for fitting the regression model (*str*)
|
|
self.fit_method = fit_method
|
|
|
|
# Basic fitted model information
|
|
#: Name of the dependent variable (*str*)
|
|
self.dep = dep
|
|
#: Number of observations (*int*)
|
|
self.nobs = nobs
|
|
#: Degrees of freedom for the model (*int*)
|
|
self.dof_model = dof_model
|
|
#: Date and time of fitting the model (Python *datetime*)
|
|
self.fitted_dt = fitted_dt
|
|
#: Method for computing the covariance matrix (*str*)
|
|
self.cov_type = cov_type
|
|
|
|
# Regression coefficients/p values
|
|
#: Coefficients and *p* values for each term in the model (*dict* of :class:`SingleTerm` or :class:`CategoricalTerm`)
|
|
self.terms = terms
|
|
|
|
# Model log-likelihood
|
|
#: Log-likelihood of fitted model (*float*)
|
|
self.llf = llf
|
|
#: Log-likelihood of null model (*float*)
|
|
self.llnull = llnull
|
|
|
|
# Extra statistics (not all regression models have these)
|
|
#: Degrees of freedom for the residuals (*int*; *None* if N/A)
|
|
self.dof_resid = dof_resid
|
|
#: *R*:sup:`2` statistic (*float*; *None* if N/A)
|
|
self.rsquared = rsquared
|
|
#: *F* statistic (*float*; *None* if N/A)
|
|
self.f_statistic = f_statistic
|
|
|
|
# Config for display style
|
|
#: See :func:`yli.regress`
|
|
self.exp = exp
|
|
|
|
@property
|
|
def pseudo_rsquared(self):
|
|
"""McFadden's pseudo *R*:sup:`2` statistic"""
|
|
|
|
return 1 - self.llf/self.llnull
|
|
|
|
def lrtest_null(self):
|
|
"""
|
|
Compute the likelihood ratio test comparing the model to the null model
|
|
|
|
:rtype: :class:`LikelihoodRatioTestResult`
|
|
"""
|
|
|
|
statistic = -2 * (self.llnull - self.llf)
|
|
pvalue = 1 - stats.chi2.cdf(statistic, self.dof_model)
|
|
|
|
return LikelihoodRatioTestResult(statistic, self.dof_model, pvalue)
|
|
|
|
def ftest(self):
|
|
"""
|
|
Perform the *F* test that all slopes are 0
|
|
|
|
:rtype: :class:`yli.sig_tests.FTestResult`
|
|
"""
|
|
|
|
pvalue = 1 - stats.f(self.dof_model, self.dof_resid).cdf(self.f_statistic)
|
|
return FTestResult(self.f_statistic, self.dof_model, self.dof_resid, pvalue)
|
|
|
|
def bayesfactor_beta_zero(self, term):
|
|
"""
|
|
Compute a Bayes factor testing the hypothesis that the given beta is 0
|
|
|
|
Uses the R *BFpack* library.
|
|
|
|
Requires the regression to be from statsmodels.
|
|
The term must be specified as the *raw name* from the statsmodels regression, available via :attr:`RegressionResult.raw_result`.
|
|
|
|
:param term: Raw name of the term to be tested
|
|
:type term: str
|
|
|
|
:rtype: :class:`yli.bayes_factors.BayesFactor`
|
|
"""
|
|
|
|
# FIXME: Allow specifying our renamed terms
|
|
|
|
# Get parameters required for AFBF
|
|
params = pd.Series({raw_name.replace('[', '_').replace(']', '_'): beta for raw_name, beta in self.raw_result.params.items()})
|
|
cov = self.raw_result.cov_params()
|
|
|
|
# Compute BF matrix
|
|
bf01 = bayesfactor_afbf(params, cov, self.nobs, '{} = 0'.format(term.replace('[', '_').replace(']', '_')))
|
|
bf01 = BayesFactor(bf01.factor, '0', '{} = 0'.format(term), '1', '{} ≠ 0'.format(term))
|
|
|
|
if bf01.factor >= 1:
|
|
return bf01
|
|
else:
|
|
return bf01.invert()
|
|
|
|
def _header_table(self, html):
|
|
"""Return the entries for the header table"""
|
|
|
|
# Left column
|
|
left_col = []
|
|
|
|
left_col.append(('Dep. Variable:', self.dep))
|
|
left_col.append(('Model:', self.model_name))
|
|
left_col.append(('Method:', self.fit_method))
|
|
left_col.append(('Date:', self.fitted_dt.strftime('%Y-%m-%d')))
|
|
left_col.append(('Time:', self.fitted_dt.strftime('%H:%M:%S')))
|
|
left_col.append(('Std. Errors:', 'Non-Robust' if self.cov_type == 'nonrobust' else self.cov_type.upper()))
|
|
|
|
# Right column
|
|
right_col = []
|
|
|
|
right_col.append(('No. Observations:', format(self.nobs, '.0f')))
|
|
right_col.append(('Df. Model:', format(self.dof_model, '.0f')))
|
|
if self.dof_resid:
|
|
right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f')))
|
|
if self.rsquared:
|
|
right_col.append(('<i>R</i><sup>2</sup>:' if html else 'R²:', format(self.rsquared, '.2f')))
|
|
else:
|
|
right_col.append(('Pseudo <i>R</i><sup>2</sup>:' if html else 'Pseudo R²:', format(self.pseudo_rsquared, '.2f')))
|
|
if self.f_statistic:
|
|
# Report the F test if available
|
|
f_result = self.ftest()
|
|
|
|
if html:
|
|
right_col.append(('<i>F</i>:', format(f_result.statistic, '.2f')))
|
|
right_col.append(('<i>p</i> (<i>F</i>):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML)))
|
|
else:
|
|
right_col.append(('F:', format(f_result.statistic, '.2f')))
|
|
right_col.append(('p (F):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY)))
|
|
else:
|
|
# Otherwise report likelihood ratio test as overall test
|
|
lrtest_result = self.lrtest_null()
|
|
|
|
right_col.append(('LL-Model:', format(self.llf, '.2f')))
|
|
right_col.append(('LL-Null:', format(self.llnull, '.2f')))
|
|
if html:
|
|
right_col.append(('<i>p</i> (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML)))
|
|
else:
|
|
right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY)))
|
|
|
|
return left_col, right_col
|
|
|
|
def __repr__(self):
|
|
if config.repr_is_summary:
|
|
return self.summary()
|
|
return super().__repr__()
|
|
|
|
def _repr_html_(self):
|
|
# Render header table
|
|
left_col, right_col = self._header_table(html=True)
|
|
|
|
out = '<table><caption>{} Results</caption>'.format(self.full_name)
|
|
for left_cell, right_cell in itertools.zip_longest(left_col, right_col):
|
|
out += '<tr><th>{}</th><td>{}</td><th>{}</th><td>{}</td></tr>'.format(
|
|
left_cell[0] if left_cell else '',
|
|
left_cell[1] if left_cell else '',
|
|
right_cell[0] if right_cell else '',
|
|
right_cell[1] if right_cell else ''
|
|
)
|
|
out += '</table>'
|
|
|
|
# Render coefficients table
|
|
out += '<table><tr><th></th><th style="text-align:center">{}</th><th colspan="3" style="text-align:center">({:g}% CI)</th><th style="text-align:center"><i>p</i></th></tr>'.format('exp(<i>β</i>)' if self.exp else '<i>β</i>', (1-config.alpha)*100)
|
|
|
|
for term_name, term in self.terms.items():
|
|
if isinstance(term, SingleTerm):
|
|
# Single term
|
|
|
|
# Exponentiate beta if requested
|
|
beta = term.beta
|
|
if self.exp:
|
|
beta = np.exp(beta)
|
|
|
|
out += '<tr><th>{}</th><td>{:.2f}</td><td style="padding-right:0">({:.2f}</td><td>–</td><td style="padding-left:0">{:.2f})</td><td style="text-align:left">{}</td></tr>'.format(term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
|
|
elif isinstance(term, CategoricalTerm):
|
|
# Categorical term
|
|
out += '<tr><th>{}</th><td></td><td style="padding-right:0"></td><td></td><td style="padding-left:0"></td><td></td></tr>'.format(term_name)
|
|
|
|
# Render reference category
|
|
out += '<tr><td style="text-align:right;font-style:italic">{}</td><td>Ref.</td><td style="padding-right:0"></td><td></td><td style="padding-left:0"></td><td></td></tr>'.format(term.ref_category)
|
|
|
|
# Loop over terms
|
|
for sub_term_name, sub_term in term.categories.items():
|
|
# Exponentiate beta if requested
|
|
beta = sub_term.beta
|
|
if self.exp:
|
|
beta = np.exp(beta)
|
|
|
|
out += '<tr><td style="text-align:right;font-style:italic">{}</td><td>{:.2f}</td><td style="padding-right:0">({:.2f}</td><td>–</td><td style="padding-left:0">{:.2f})</td><td style="text-align:left">{}</td></tr>'.format(sub_term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(sub_term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
|
|
else:
|
|
raise Exception('Attempt to render unknown term type')
|
|
|
|
out += '</table>'
|
|
|
|
# TODO: Have a detailed view which shows SE, t/z, etc.
|
|
|
|
return out
|
|
|
|
def summary(self):
|
|
"""
|
|
Return a stringified summary of the regression model
|
|
|
|
:rtype: str
|
|
"""
|
|
|
|
# Render header table
|
|
left_col, right_col = self._header_table(html=False)
|
|
|
|
# Ensure equal sizes for SimpleTable
|
|
if len(right_col) > len(left_col):
|
|
left_col.extend([('', '')] * (len(right_col) - len(left_col)))
|
|
elif len(left_col) > len(right_col):
|
|
right_col.extend([('', '')] * (len(left_col) - len(right_col)))
|
|
|
|
table1 = SimpleTable(np.concatenate([left_col, right_col], axis=1), title='{} Results'.format(self.full_name))
|
|
table1.insert_stubs(2, [' | '] * len(left_col))
|
|
|
|
# Get rid of last line (merge with next table)
|
|
table1_lines = table1.as_text().splitlines(keepends=False)
|
|
out = '\n'.join(table1_lines[:-1]) + '\n'
|
|
|
|
# Render coefficients table
|
|
table_data = []
|
|
|
|
for term_name, term in self.terms.items():
|
|
if isinstance(term, SingleTerm):
|
|
# Single term
|
|
|
|
# Exponentiate beta if requested
|
|
beta = term.beta
|
|
if self.exp:
|
|
beta = np.exp(beta)
|
|
|
|
# Add some extra padding
|
|
table_data.append([term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(term.pvalue, PValueStyle.TABULAR)])
|
|
elif isinstance(term, CategoricalTerm):
|
|
# Categorical term
|
|
table_data.append([term_name + ' ', '', '', '', '', ''])
|
|
|
|
# Render reference category
|
|
table_data.append(['{} '.format(term.ref_category), 'Ref.', '', '', '', ''])
|
|
|
|
# Loop over terms
|
|
for sub_term_name, sub_term in term.categories.items():
|
|
# Exponentiate beta if requested
|
|
beta = sub_term.beta
|
|
if self.exp:
|
|
beta = np.exp(beta)
|
|
|
|
table_data.append([sub_term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(sub_term.pvalue, PValueStyle.TABULAR)])
|
|
else:
|
|
raise Exception('Attempt to render unknown term type')
|
|
|
|
table2 = SimpleTable(data=table_data, headers=['', 'exp(β)' if self.exp else 'β', '', '\ue000', '', ' p']) # U+E000 is in Private Use Area, mark middle of CI column
|
|
table2_text = table2.as_text().replace(' \ue000 ', '({:g}% CI)'.format((1-config.alpha)*100)) # Render heading in the right spot
|
|
table2_lines = table2_text.splitlines(keepends=False)
|
|
|
|
# Render divider line between 2 tables
|
|
max_table_len = max(len(table1_lines[-1]), len(table2_lines[-1]))
|
|
out += '=' * max_table_len + '\n'
|
|
|
|
out += '\n'.join(table2_lines[1:])
|
|
|
|
return out
|
|
|
|
class SingleTerm:
|
|
"""A term in a :class:`RegressionResult` which is a single term"""
|
|
|
|
def __init__(self, raw_name, beta, pvalue):
|
|
#: Raw name of the term (*str*; e.g. in :attr:`RegressionResult.raw_result`)
|
|
self.raw_name = raw_name
|
|
#: :class:`yli.utils.Estimate` of the coefficient
|
|
self.beta = beta
|
|
#: *p* value for the coefficient (*float*)
|
|
self.pvalue = pvalue
|
|
|
|
class CategoricalTerm:
|
|
"""A group of terms in a :class:`RegressionResult` corresponding to a categorical variable"""
|
|
|
|
def __init__(self, categories, ref_category):
|
|
#: Terms for each of the categories, excluding the reference category (*dict* of :class:`SingleTerm`)
|
|
self.categories = categories
|
|
#: Name of the reference category (*str*)
|
|
self.ref_category = ref_category
|
|
|
|
def regress(
|
|
model_class, df, dep, formula, *,
|
|
nan_policy='warn',
|
|
model_kwargs=None, fit_kwargs=None,
|
|
family=None, # common model_kwargs
|
|
cov_type=None, maxiter=None, start_params=None, # common fit_kwargs
|
|
bool_baselevels=False, exp=None
|
|
):
|
|
"""
|
|
Fit a statsmodels regression model
|
|
|
|
:param model_class: Type of regression model to fit
|
|
:type model_class: statsmodels model class
|
|
:param df: Data to perform regression on
|
|
:type df: DataFrame
|
|
:param dep: Column in *df* for the dependent variable (numeric)
|
|
:type dep: str
|
|
:param formula: Patsy formula for the regression model
|
|
:type formula: str
|
|
:param nan_policy: How to handle *nan* values (see :ref:`nan-handling`)
|
|
:type nan_policy: str
|
|
:param model_kwargs: Keyword arguments to pass to *model_class* constructor
|
|
:type model_kwargs: dict
|
|
:param fit_kwargs: Keyword arguments to pass to *model.fit*
|
|
:type fit_kwargs: dict
|
|
:param family: See statsmodels *GLM* constructor
|
|
:param cov_type: See statsmodels *model.fit*
|
|
:param maxiter: See statsmodels *model.fit*
|
|
:param start_params: See statsmodels *model.fit*
|
|
:param bool_baselevels: Show reference categories for boolean independent variables even if reference category is *False*
|
|
:type bool_baselevels: bool
|
|
:param exp: Report exponentiated parameters rather than raw parameters, default (*None*) is to autodetect based on *model_class*
|
|
:type exp: bool
|
|
|
|
:rtype: :class:`yli.regress.RegressionResult`
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame({
|
|
'Unhealthy': [False, False, False, ...],
|
|
'Fibrinogen': [2.52, 2.46, 2.29, ...],
|
|
'GammaGlobulin': [38, 36, 36, ...]
|
|
})
|
|
yli.regress(sm.Logit, df, 'Unhealthy', 'Fibrinogen + GammaGlobulin')
|
|
|
|
.. code-block:: text
|
|
|
|
Logistic Regression Results
|
|
======================================================
|
|
Dep. Variable: Unhealthy | No. Observations: 32
|
|
Model: Logit | Df. Model: 2
|
|
Method: MLE | Df. Residuals: 29
|
|
Date: 2022-10-18 | Pseudo R²: 0.26
|
|
Time: 19:00:34 | LL-Model: -11.47
|
|
Std. Errors: Non-Robust | LL-Null: -15.44
|
|
| p (LR): 0.02*
|
|
======================================================
|
|
exp(β) (95% CI) p
|
|
-----------------------------------------------
|
|
(Intercept) 0.00 (0.00 - 0.24) 0.03*
|
|
Fibrinogen 6.80 (1.01 - 45.79) 0.049*
|
|
GammaGlobulin 1.17 (0.92 - 1.48) 0.19
|
|
-----------------------------------------------
|
|
|
|
The output summarises the results of the regression.
|
|
Note that the parameter estimates are automatically exponentiated.
|
|
For example, the odds ratio for unhealthiness per unit increase in fibrinogen is 6.80, with 95% confidence interval 1.01–45.79, and is significant with *p* value 0.049.
|
|
"""
|
|
|
|
# Populate model_kwargs
|
|
if model_kwargs is None:
|
|
model_kwargs = {}
|
|
if family is not None:
|
|
model_kwargs['family'] = family
|
|
|
|
# Populate fit_kwargs
|
|
if fit_kwargs is None:
|
|
fit_kwargs = {}
|
|
if cov_type is not None:
|
|
fit_kwargs['cov_type'] = cov_type
|
|
if maxiter is not None:
|
|
fit_kwargs['maxiter'] = maxiter
|
|
if start_params is not None:
|
|
fit_kwargs['start_params'] = start_params
|
|
|
|
# Autodetect whether to exponentiate
|
|
if exp is None:
|
|
if model_class in (sm.Logit, sm.Poisson, PenalisedLogit):
|
|
exp = True
|
|
else:
|
|
exp = False
|
|
|
|
# Check for/clean NaNs
|
|
df = df[[dep] + cols_for_formula(formula, df)]
|
|
df = check_nan(df, nan_policy)
|
|
|
|
# Ensure numeric type for dependent variable
|
|
if df[dep].dtype != 'float64':
|
|
df[dep] = df[dep].astype('float64')
|
|
|
|
# Convert pandas nullable types for independent variables as this breaks statsmodels
|
|
df = convert_pandas_nullable(df)
|
|
|
|
# Fit model
|
|
model = model_class.from_formula(formula=dep + ' ~ ' + formula, data=df, **model_kwargs)
|
|
result = model.fit(disp=False, **fit_kwargs)
|
|
|
|
if isinstance(result, RegressionResult):
|
|
# Already processed!
|
|
result.exp = exp
|
|
return result
|
|
|
|
# Check convergence
|
|
if hasattr(result, 'mle_retvals') and not result.mle_retvals['converged']:
|
|
warnings.warn('Maximum likelihood estimation failed to converge. Check raw_result.mle_retvals.')
|
|
|
|
# Process terms
|
|
terms = {}
|
|
|
|
confint = result.conf_int(config.alpha)
|
|
|
|
for raw_name, raw_beta in result.params.items():
|
|
beta = Estimate(raw_beta, confint[0][raw_name], confint[1][raw_name])
|
|
|
|
# Rename terms
|
|
if raw_name == 'Intercept':
|
|
# Intercept term (single term)
|
|
term = '(Intercept)'
|
|
terms[term] = SingleTerm(raw_name, beta, result.pvalues[raw_name])
|
|
else:
|
|
# Parse if required
|
|
factor, column, contrast = parse_patsy_term(formula, df, raw_name)
|
|
|
|
if contrast is not None:
|
|
# Categorical term
|
|
|
|
if bool_baselevels is False and contrast == 'True' and set(df[column].unique()) == set([True, False]):
|
|
# Treat as single term
|
|
terms[column] = SingleTerm(raw_name, beta, result.pvalues[raw_name])
|
|
else:
|
|
# Add a new categorical term if not exists
|
|
if column not in terms:
|
|
ref_category = formula_factor_ref_category(formula, df, factor)
|
|
terms[column] = CategoricalTerm({}, ref_category)
|
|
|
|
terms[column].categories[contrast] = SingleTerm(raw_name, beta, result.pvalues[raw_name])
|
|
else:
|
|
# Single term
|
|
terms[column] = SingleTerm(raw_name, beta, result.pvalues[raw_name])
|
|
|
|
# Fit null model (for llnull)
|
|
if hasattr(result, 'llnull'):
|
|
llnull = result.llnull
|
|
else:
|
|
result_null = model_class.from_formula(formula=dep + ' ~ 1', data=df).fit()
|
|
llnull = result_null.llf
|
|
|
|
# Parse raw regression results (to get fit method)
|
|
header_entries = np.vectorize(str.strip)(np.concatenate(np.split(np.array(result.summary().tables[0].data), 2, axis=1)))
|
|
header_dict = {x[0]: x[1] for x in header_entries}
|
|
|
|
# Get full name to display
|
|
if model_class is sm.Logit:
|
|
full_name = 'Logistic Regression'
|
|
else:
|
|
full_name = '{} Regression'.format(model_class.__name__)
|
|
if fit_kwargs.get('cov_type', 'nonrobust') != 'nonrobust':
|
|
full_name = 'Robust {}'.format(full_name)
|
|
|
|
return RegressionResult(
|
|
result,
|
|
full_name, model_class.__name__, header_dict['Method:'],
|
|
dep, result.nobs, result.df_model, datetime.now(), result.cov_type,
|
|
terms,
|
|
result.llf, llnull,
|
|
getattr(result, 'df_resid', None), getattr(result, 'rsquared', None), getattr(result, 'fvalue', None),
|
|
exp
|
|
)
|
|
|
|
def logit_then_regress(model_class, df, dep, formula, *, nan_policy='warn', **kwargs):
|
|
"""
|
|
Perform logistic regression, then use parameters as start parameters for desired regression
|
|
|
|
:param model_class: Type of regression model to fit
|
|
:type model_class: statsmodels model class
|
|
:param df: Data to perform regression on
|
|
:type df: DataFrame
|
|
:param dep: Column in *df* for the dependent variable (numeric)
|
|
:type dep: str
|
|
:param formula: Patsy formula for the regression model
|
|
:type formula: str
|
|
:param nan_policy: How to handle *nan* values (see :ref:`nan-handling`)
|
|
:type nan_policy: str
|
|
:param kwargs: Passed through to :func:`yli.regress`
|
|
|
|
:rtype: :class:`yli.regress.RegressionResult`
|
|
"""
|
|
|
|
# Check for/clean NaNs
|
|
# Do this once here so we only get 1 warning
|
|
df = df[[dep] + cols_for_formula(formula, df)]
|
|
df = check_nan(df, nan_policy)
|
|
|
|
# Perform logistic regression
|
|
logit_result = regress(sm.Logit, df, dep, formula, **kwargs)
|
|
logit_params = logit_result.raw_result.params
|
|
|
|
# Check convergence
|
|
if not logit_result.raw_result.mle_retvals['converged']:
|
|
return None
|
|
|
|
# Perform desired regression
|
|
return regress(model_class, df, dep, formula, start_params=logit_params, **kwargs)
|
|
|
|
# -----------------------------
|
|
# Penalised logistic regression
|
|
|
|
class PenalisedLogit(statsmodels.discrete.discrete_model.BinaryModel):
|
|
"""
|
|
statsmodel-compatible model for computing Firth penalised logistic regression
|
|
|
|
Uses the R *logistf* library.
|
|
|
|
This class should be used in conjunction with :func:`yli.regress`.
|
|
|
|
**Example:**
|
|
|
|
.. code-block::
|
|
|
|
df = pd.DataFrame({
|
|
'Pred': [1] * 20 + [0] * 220,
|
|
'Outcome': [1] * 40 + [0] * 200
|
|
})
|
|
yli.regress(yli.PenalisedLogit, df, 'Outcome', 'Pred', exp=False)
|
|
|
|
.. code-block:: text
|
|
|
|
Penalised Logistic Regression Results
|
|
=========================================================
|
|
Dep. Variable: Outcome | No. Observations: 240
|
|
Model: Logit | Df. Model: 1
|
|
Method: Penalised ML | Pseudo R²: 0.37
|
|
Date: 2022-10-19 | LL-Model: -66.43
|
|
Time: 07:50:40 | LL-Null: -105.91
|
|
Std. Errors: Non-Robust | p (LR): <0.001*
|
|
=========================================================
|
|
β (95% CI) p
|
|
---------------------------------------------
|
|
(Intercept) -2.28 (-2.77 - -1.85) <0.001*
|
|
Pred 5.99 (3.95 - 10.85) <0.001*
|
|
---------------------------------------------
|
|
|
|
The output summarises the result of the regression.
|
|
The summary table shows that a penalised method was applied.
|
|
Note that because `exp=False` was passed, the parameter estimates are not automatically exponentiated.
|
|
"""
|
|
|
|
def fit(self, disp=False):
|
|
import rpy2.robjects as ro
|
|
import rpy2.robjects.packages
|
|
import rpy2.robjects.pandas2ri
|
|
|
|
# Assume data is already cleaned from regress()
|
|
df = self.data.frame.copy()
|
|
|
|
# Convert bool to int otherwise rpy2 chokes
|
|
df = df.replace({False: 0, True: 1})
|
|
|
|
# Import logistf
|
|
ro.packages.importr('logistf')
|
|
|
|
with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter):
|
|
with ro.local_context() as lc:
|
|
# Convert DataFrame to R
|
|
lc['df'] = df
|
|
|
|
# Transfer other parameters to R
|
|
lc['formula_'] = self.formula
|
|
lc['alpha'] = config.alpha
|
|
|
|
# Fit the model
|
|
model = ro.r('logistf(formula_, data=df, alpha=alpha)')
|
|
|
|
# TODO: Handle categorical terms?
|
|
terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])}
|
|
|
|
return RegressionResult(
|
|
model,
|
|
'Penalised Logistic Regression', 'Logit', 'Penalised ML',
|
|
self.endog_names, model['n'][0], model['df'][0], datetime.now(), 'nonrobust',
|
|
terms,
|
|
model['loglik'][0], model['loglik'][1],
|
|
None, None, None,
|
|
None # Set exp in regress()
|
|
)
|