# scipy-yli: Helpful SciPy utilities and recipes # Copyright © 2022 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import numpy as np import pandas as pd from scipy import stats import statsmodels import statsmodels.api as sm from statsmodels.iolib.table import SimpleTable from statsmodels.stats.outliers_influence import variance_inflation_factor from datetime import datetime import itertools from .bayes_factors import BayesFactor, bayesfactor_afbf from .config import config from .sig_tests import FTestResult from .utils import Estimate, check_nan, cols_for_formula, fmt_p, formula_factor_ref_category, parse_patsy_term def vif(df, formula=None, *, nan_policy='warn'): """ Calculate the variance inflation factor (VIF) for each variable in *df* :param df: Data to calculate the VIF for :type df: DataFrame :param formula: If specified, calculate the VIF only for the variables in the formula :type formula: str :return: The variance inflation factor :rtype: float """ if formula: # Only consider columns in the formula df = df[cols_for_formula(formula, df)] # Check for/clean NaNs df = check_nan(df, nan_policy) # Convert all to float64 otherwise statsmodels chokes with "ufunc 'isfinite' not supported for the input types ..." df = pd.get_dummies(df, drop_first=True) # Convert categorical dtypes df = df.astype('float64') # Convert all other dtypes # Add intercept column orig_columns = list(df.columns) df['Intercept'] = [1] * len(df) vifs = {} for i, col in enumerate(orig_columns): vifs[col] = variance_inflation_factor(df, i) return pd.Series(vifs) # ---------- # Regression class LikelihoodRatioTestResult: """ Result of a likelihood ratio test for regression See :meth:`RegressionResult.lrtest_null`. """ def __init__(self, statistic, dof, pvalue): #: Likelihood ratio test statistic (*float*) self.statistic = statistic #: Degrees of freedom for the likelihood ratio test statistic (*int*) self.dof = dof #: *p* value for the likelihood ratio test (*float*) self.pvalue = pvalue def __repr__(self): if config.repr_is_summary: return self.summary() return super().__repr__() def _repr_html_(self): return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, html=True)) def summary(self): """ Return a stringified summary of the likelihood ratio test :rtype: str """ return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, html=False)) class RegressionResult: """ Result of a regression See :func:`yli.regress`. """ def __init__(self, raw_result, full_name, model_name, fit_method, dep, nobs, dof_model, fitted_dt, cov_type, terms, llf, llnull, dof_resid, rsquared, f_statistic, exp ): #: Raw result from statsmodels *model.fit* self.raw_result = raw_result # Information for display #: Full name of the regression model type (*str*) self.full_name = full_name #: Short name of the regression model type (*str*) self.model_name = model_name #: Method for fitting the regression model (*str*) self.fit_method = fit_method # Basic fitted model information #: Name of the dependent variable (*str*) self.dep = dep #: Number of observations (*int*) self.nobs = nobs #: Degrees of freedom for the model (*int*) self.dof_model = dof_model #: Date and time of fitting the model (Python *datetime*) self.fitted_dt = fitted_dt #: Method for computing the covariance matrix (*str*) self.cov_type = cov_type # Regression coefficients/p values #: Coefficients and *p* values for each term in the model (*dict* of :class:`SingleTerm` or :class:`CategoricalTerm`) self.terms = terms # Model log-likelihood #: Log-likelihood of fitted model (*float*) self.llf = llf #: Log-likelihood of null model (*float*) self.llnull = llnull # Extra statistics (not all regression models have these) #: Degrees of freedom for the residuals (*int*; *None* if N/A) self.dof_resid = dof_resid #: *R*:sup:`2` statistic (*float*; *None* if N/A) self.rsquared = rsquared #: *F* statistic (*float*; *None* if N/A) self.f_statistic = f_statistic # Config for display style #: See :func:`yli.regress` self.exp = exp @property def pseudo_rsquared(self): """McFadden's pseudo *R*:sup:`2` statistic""" return 1 - self.llf/self.llnull def lrtest_null(self): """ Compute the likelihood ratio test comparing the model to the null model :rtype: :class:`LikelihoodRatioTestResult` """ statistic = -2 * (self.llnull - self.llf) pvalue = 1 - stats.chi2.cdf(statistic, self.dof_model) return LikelihoodRatioTestResult(statistic, self.dof_model, pvalue) def ftest(self): """ Perform the *F* test that all slopes are 0 :rtype: :class:`yli.sig_tests.FTestResult` """ pvalue = 1 - stats.f(self.dof_model, self.dof_resid).cdf(self.f_statistic) return FTestResult(self.f_statistic, self.dof_model, self.dof_resid, pvalue) def bayesfactor_beta_zero(self, term): """ Compute a Bayes factor testing the hypothesis that the given beta is 0 Uses the R *BFpack* library. Requires the regression to be from statsmodels. The term must be specified as the *raw name* from the statsmodels regression, available via :attr:`RegressionResult.raw_result`. :param term: Raw name of the term to be tested :type term: str :rtype: :class:`yli.bayes_factors.BayesFactor` """ # FIXME: Allow specifying our renamed terms # Get parameters required for AFBF params = pd.Series({raw_name.replace('[', '_').replace(']', '_'): beta for raw_name, beta in self.raw_result.params.items()}) cov = self.raw_result.cov_params() # Compute BF matrix bf01 = bayesfactor_afbf(params, cov, self.nobs, '{} = 0'.format(term.replace('[', '_').replace(']', '_'))) bf01 = BayesFactor(bf01.factor, '0', '{} = 0'.format(term), '1', '{} ≠ 0'.format(term)) if bf01.factor >= 1: return bf01 else: return bf01.invert() def _header_table(self, html): """Return the entries for the header table""" # Left column left_col = [] left_col.append(('Dep. Variable:', self.dep)) left_col.append(('Model:', self.model_name)) left_col.append(('Method:', self.fit_method)) left_col.append(('Date:', self.fitted_dt.strftime('%Y-%m-%d'))) left_col.append(('Time:', self.fitted_dt.strftime('%H:%M:%S'))) left_col.append(('Std. Errors:', 'Non-Robust' if self.cov_type == 'nonrobust' else self.cov_type.upper())) # Right column right_col = [] right_col.append(('No. Observations:', format(self.nobs, '.0f'))) right_col.append(('Df. Model:', format(self.dof_model, '.0f'))) if self.dof_resid: right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f'))) if self.rsquared: right_col.append(('R2:' if html else 'R²:', format(self.rsquared, '.2f'))) else: right_col.append(('Pseudo R2:' if html else 'Pseudo R²:', format(self.pseudo_rsquared, '.2f'))) if self.f_statistic: # Report the F test if available f_result = self.ftest() if html: right_col.append(('F:', format(f_result.statistic, '.2f'))) right_col.append(('p (F):', fmt_p(f_result.pvalue, html=True, tabular=True))) else: right_col.append(('F:', format(f_result.statistic, '.2f'))) right_col.append(('p (F):', fmt_p(f_result.pvalue, html=False, tabular=True))) else: # Otherwise report likelihood ratio test as overall test lrtest_result = self.lrtest_null() right_col.append(('LL-Model:', format(self.llf, '.2f'))) right_col.append(('LL-Null:', format(self.llnull, '.2f'))) if html: right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, html=True, tabular=True))) else: right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, html=False, tabular=True))) return left_col, right_col def __repr__(self): if config.repr_is_summary: return self.summary() return super().__repr__() def _repr_html_(self): # Render header table left_col, right_col = self._header_table(html=True) out = ''.format(self.full_name) for left_cell, right_cell in itertools.zip_longest(left_col, right_col): out += ''.format( left_cell[0] if left_cell else '', left_cell[1] if left_cell else '', right_cell[0] if right_cell else '', right_cell[1] if right_cell else '' ) out += '
{} Results
{}{}{}{}
' # Render coefficients table out += ''.format('exp(β)' if self.exp else 'β', (1-config.alpha)*100) for term_name, term in self.terms.items(): if isinstance(term, SingleTerm): # Single term # Exponentiate beta if requested beta = term.beta if self.exp: beta = np.exp(beta) out += ''.format(term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(term.pvalue, html=True, tabular=True)) elif isinstance(term, CategoricalTerm): # Categorical term out += ''.format(term_name) # Render reference category out += ''.format(term.ref_category) # Loop over terms for sub_term_name, sub_term in term.categories.items(): # Exponentiate beta if requested beta = sub_term.beta if self.exp: beta = np.exp(beta) out += ''.format(sub_term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(sub_term.pvalue, html=True, tabular=True)) else: raise Exception('Attempt to render unknown term type') out += '
{}({:g}% CI)p
{}{:.2f}({:.2f}{:.2f}){}
{}
{}Ref.
{}{:.2f}({:.2f}{:.2f}){}
' # TODO: Have a detailed view which shows SE, t/z, etc. return out def summary(self): """ Return a stringified summary of the regression model :rtype: str """ # Render header table left_col, right_col = self._header_table(html=False) # Ensure equal sizes for SimpleTable if len(right_col) > len(left_col): left_col.extend([('', '')] * (len(right_col) - len(left_col))) elif len(left_col) > len(right_col): right_col.extend([('', '')] * (len(left_col) - len(right_col))) table1 = SimpleTable(np.concatenate([left_col, right_col], axis=1), title='{} Results'.format(self.full_name)) table1.insert_stubs(2, [' | '] * len(left_col)) # Get rid of last line (merge with next table) table1_lines = table1.as_text().splitlines(keepends=False) out = '\n'.join(table1_lines[:-1]) + '\n' # Render coefficients table table_data = [] for term_name, term in self.terms.items(): if isinstance(term, SingleTerm): # Single term # Exponentiate beta if requested beta = term.beta if self.exp: beta = np.exp(beta) # Add some extra padding table_data.append([term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(term.pvalue, html=False, tabular=True)]) elif isinstance(term, CategoricalTerm): # Categorical term table_data.append([term_name + ' ', '', '', '', '', '']) # Render reference category table_data.append(['{} '.format(term.ref_category), 'Ref.', '', '', '', '']) # Loop over terms for sub_term_name, sub_term in term.categories.items(): # Exponentiate beta if requested beta = sub_term.beta if self.exp: beta = np.exp(beta) table_data.append([sub_term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(sub_term.pvalue, html=False, tabular=True)]) else: raise Exception('Attempt to render unknown term type') table2 = SimpleTable(data=table_data, headers=['', 'exp(β)' if self.exp else 'β', '', '\ue000', '', ' p']) # U+E000 is in Private Use Area, mark middle of CI column table2_text = table2.as_text().replace(' \ue000 ', '({:g}% CI)'.format((1-config.alpha)*100)) # Render heading in the right spot table2_lines = table2_text.splitlines(keepends=False) # Render divider line between 2 tables max_table_len = max(len(table1_lines[-1]), len(table2_lines[-1])) out += '=' * max_table_len + '\n' out += '\n'.join(table2_lines[1:]) return out class SingleTerm: """A term in a :class:`RegressionResult` which is a single term""" def __init__(self, raw_name, beta, pvalue): #: Raw name of the term (*str*; e.g. in :attr:`RegressionResult.raw_result`) self.raw_name = raw_name #: :class:`yli.utils.Estimate` of the coefficient self.beta = beta #: *p* value for the coefficient (*float*) self.pvalue = pvalue class CategoricalTerm: """A group of terms in a :class:`RegressionResult` corresponding to a categorical variable""" def __init__(self, categories, ref_category): #: Terms for each of the categories, excluding the reference category (*dict* of :class:`SingleTerm`) self.categories = categories #: Name of the reference category (*str*) self.ref_category = ref_category def regress( model_class, df, dep, formula, *, nan_policy='warn', model_kwargs=None, fit_kwargs=None, family=None, # common model_kwargs cov_type=None, maxiter=None, start_params=None, # common fit_kwargs bool_baselevels=False, exp=None ): """ Fit a statsmodels regression model :param model_class: Type of regression model to fit :type model_class: statsmodels model class :param df: Data to perform regression on :type df: DataFrame :param dep: Column in *df* for the dependent variable (numeric) :type dep: str :param formula: Patsy formula for the regression model :type formula: str :param nan_policy: How to handle *nan* values (see :ref:`nan-handling`) :type nan_policy: str :param model_kwargs: Keyword arguments to pass to *model_class* constructor :type model_kwargs: dict :param fit_kwargs: Keyword arguments to pass to *model.fit* :type fit_kwargs: dict :param family: See statsmodels *GLM* constructor :param cov_type: See statsmodels *model.fit* :param maxiter: See statsmodels *model.fit* :param start_params: See statsmodels *model.fit* :param bool_baselevels: Show reference categories for boolean independent variables even if reference category is *False* :type bool_baselevels: bool :param exp: Report exponentiated parameters rather than raw parameters, default (*None*) is to autodetect based on *model_class* :type exp: bool :rtype: :class:`yli.regress.RegressionResult` """ # Populate model_kwargs if model_kwargs is None: model_kwargs = {} if family is not None: model_kwargs['family'] = family # Populate fit_kwargs if fit_kwargs is None: fit_kwargs = {} if cov_type is not None: fit_kwargs['cov_type'] = cov_type if maxiter is not None: fit_kwargs['maxiter'] = maxiter if start_params is not None: fit_kwargs['start_params'] = start_params # Autodetect whether to exponentiate if exp is None: if model_class in (sm.Logit, sm.Poisson, PenalisedLogit): exp = True else: exp = False # Check for/clean NaNs df = df[[dep] + cols_for_formula(formula, df)] df = check_nan(df, nan_policy) # Ensure numeric type for dependent variable if df[dep].dtype != 'float64': df[dep] = df[dep].astype('float64') # Convert pandas nullable types for independent variables for col in df.columns: if df[col].dtype == 'Int64': df[col] = df[col].astype('float64') # Fit model model = model_class.from_formula(formula=dep + ' ~ ' + formula, data=df, **model_kwargs) result = model.fit(disp=False, **fit_kwargs) if isinstance(result, RegressionResult): # Already processed! result.exp = exp return result # Check convergence if hasattr(result, 'mle_retvals') and not result.mle_retvals['converged']: warnings.warn('Maximum likelihood estimation failed to converge. Check raw_result.mle_retvals.') # Process terms terms = {} confint = result.conf_int(config.alpha) for raw_name, raw_beta in result.params.items(): beta = Estimate(raw_beta, confint[0][raw_name], confint[1][raw_name]) # Rename terms if raw_name == 'Intercept': # Intercept term (single term) term = '(Intercept)' terms[term] = SingleTerm(raw_name, beta, result.pvalues[raw_name]) else: # Parse if required factor, column, contrast = parse_patsy_term(formula, df, raw_name) if contrast is not None: # Categorical term if bool_baselevels is False and contrast == 'True' and set(df[column].unique()) == set([True, False]): # Treat as single term terms[column] = SingleTerm(raw_name, beta, result.pvalues[raw_name]) else: # Add a new categorical term if not exists if column not in terms: ref_category = formula_factor_ref_category(formula, df, factor) terms[column] = CategoricalTerm({}, ref_category) terms[column].categories[contrast] = SingleTerm(raw_name, beta, result.pvalues[raw_name]) else: # Single term terms[column] = SingleTerm(raw_name, beta, result.pvalues[raw_name]) # Fit null model (for llnull) if hasattr(result, 'llnull'): llnull = result.llnull else: result_null = model_class.from_formula(formula=dep + ' ~ 1', data=df).fit() llnull = result_null.llf # Parse raw regression results (to get fit method) header_entries = np.vectorize(str.strip)(np.concatenate(np.split(np.array(result.summary().tables[0].data), 2, axis=1))) header_dict = {x[0]: x[1] for x in header_entries} # Get full name to display if model_class is sm.Logit: full_name = 'Logistic Regression' else: full_name = '{} Regression'.format(model_class.__name__) if fit_kwargs.get('cov_type', 'nonrobust') != 'nonrobust': full_name = 'Robust {}'.format(full_name) return RegressionResult( result, full_name, model_class.__name__, header_dict['Method:'], dep, result.nobs, result.df_model, datetime.now(), result.cov_type, terms, result.llf, llnull, getattr(result, 'df_resid', None), getattr(result, 'rsquared', None), getattr(result, 'fvalue', None), exp ) def logit_then_regress(model_class, df, dep, formula, *, nan_policy='warn', **kwargs): """ Perform logistic regression, then use parameters as start parameters for desired regression :param model_class: Type of regression model to fit :type model_class: statsmodels model class :param df: Data to perform regression on :type df: DataFrame :param dep: Column in *df* for the dependent variable (numeric) :type dep: str :param formula: Patsy formula for the regression model :type formula: str :param nan_policy: How to handle *nan* values (see :ref:`nan-handling`) :type nan_policy: str :param kwargs: Passed through to :func:`yli.regress` :rtype: :class:`yli.regress.RegressionResult` """ # Check for/clean NaNs # Do this once here so we only get 1 warning df = df[[dep] + cols_for_formula(formula, df)] df = check_nan(df, nan_policy) # Perform logistic regression logit_result = regress(sm.Logit, df, dep, formula, **kwargs) logit_params = logit_result.raw_result.params # Check convergence if not logit_result.raw_result.mle_retvals['converged']: return None # Perform desired regression return regress(model_class, df, dep, formula, start_params=logit_params, **kwargs) # ----------------------------- # Penalised logistic regression class PenalisedLogit(statsmodels.discrete.discrete_model.BinaryModel): """ statsmodel-compatible model for computing Firth penalised logistic regression Uses the R *logistf* library. This class should be used in conjunction with :func:`yli.regress`. """ def fit(self, disp=False): import rpy2.robjects as ro import rpy2.robjects.packages import rpy2.robjects.pandas2ri # Assume data is already cleaned from regress() df = self.data.frame.copy() # Convert bool to int otherwise rpy2 chokes df = df.replace({False: 0, True: 1}) # Import logistf ro.packages.importr('logistf') with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter): with ro.local_context() as lc: # Convert DataFrame to R lc['df'] = df # Transfer other parameters to R lc['formula_'] = self.formula lc['alpha'] = config.alpha # Fit the model model = ro.r('logistf(formula_, data=df, alpha=alpha)') # TODO: Handle categorical terms? terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])} return RegressionResult( model, 'Penalised Logistic Regression', 'Logit', 'Penalised ML', self.endog_names, model['n'][0], model['df'][0], datetime.now(), 'nonrobust', terms, model['loglik'][0], model['loglik'][1], None, None, None, None # Set exp in regress() )