# scipy-yli: Helpful SciPy utilities and recipes # Copyright © 2022 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import numpy as np import pandas as pd from scipy import stats import statsmodels import statsmodels.api as sm from statsmodels.iolib.table import SimpleTable from statsmodels.stats.outliers_influence import variance_inflation_factor from datetime import datetime import itertools from .bayes_factors import BayesFactor, bayesfactor_afbf from .config import config from .sig_tests import FTestResult from .utils import Estimate, check_nan, cols_for_formula, fmt_p, formula_factor_ref_category, parse_patsy_term def vif(df, formula=None, nan_policy='warn'): """ Calculate the variance inflation factor for each variable in df formula: If specified, calculate the VIF only for the variables in the formula """ if formula: # Only consider columns in the formula df = df[cols_for_formula(formula, df)] # Check for/clean NaNs df = check_nan(df, nan_policy) # Convert all to float64 otherwise statsmodels chokes with "ufunc 'isfinite' not supported for the input types ..." df = pd.get_dummies(df, drop_first=True) # Convert categorical dtypes df = df.astype('float64') # Convert all other dtypes # Add intercept column orig_columns = list(df.columns) df['Intercept'] = [1] * len(df) vifs = {} for i, col in enumerate(orig_columns): vifs[col] = variance_inflation_factor(df, i) return pd.Series(vifs) # ---------- # Regression class LikelihoodRatioTestResult: """Result of a likelihood ratio test for regression""" def __init__(self, statistic, dof, pvalue): self.statistic = statistic self.dof = dof self.pvalue = pvalue def _repr_html_(self): return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, html=True)) def summary(self): return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, html=False)) class RegressionResult: """ Result of a regression llf/llnull: Log-likelihood of model/null model """ def __init__(self, raw_result, full_name, model_name, fit_method, dep, nobs, dof_model, fitted_dt, terms, llf, llnull, dof_resid, rsquared, f_statistic, exp ): # A copy of the raw result so we can access it self.raw_result = raw_result # Information for display self.full_name = full_name self.model_name = model_name self.fit_method = fit_method # Basic fitted model information self.dep = dep self.nobs = nobs self.dof_model = dof_model self.fitted_dt = fitted_dt # Regression coefficients/p values self.terms = terms # Model log-likelihood self.llf = llf self.llnull = llnull # Extra statistics (not all regression models have these) self.dof_resid = dof_resid self.rsquared = rsquared self.f_statistic = f_statistic # Config for display style self.exp = exp @property def pseudo_rsquared(self): """McFadden's pseudo R-squared""" return 1 - self.llf/self.llnull def lrtest_null(self): """Compute the likelihood ratio test comparing the model to the null model""" statistic = -2 * (self.llnull - self.llf) pvalue = 1 - stats.chi2.cdf(statistic, self.dof_model) return LikelihoodRatioTestResult(statistic, self.dof_model, pvalue) def ftest(self): """Perform the F test that all slopes are 0""" pvalue = 1 - stats.f(self.dof_model, self.dof_resid).cdf(self.f_statistic) return FTestResult(self.f_statistic, self.dof_model, self.dof_resid, pvalue) def bayesfactor_beta_zero(self, term): """ Compute Bayes factor testing the hypothesis that the given beta is 0 Requires statsmodels regression term: Raw name (from statsmodels, available via RegressionResult.raw_result) of term to be tested """ # FIXME: Allow specifying our renamed terms # Get parameters required for AFBF params = pd.Series({raw_name.replace('[', '_').replace(']', '_'): beta for raw_name, beta in self.raw_result.params.items()}) cov = self.raw_result.cov_params() # Compute BF matrix bf01 = bayesfactor_afbf(params, cov, self.nobs, '{} = 0'.format(term.replace('[', '_').replace(']', '_'))) bf01 = BayesFactor(bf01.factor, '0', '{} = 0'.format(term), '1', '{} ≠ 0'.format(term)) if bf01.factor >= 1: return bf01 else: return bf01.invert() def _header_table(self, html): """Return the entries for the header table""" # Left column left_col = [] left_col.append(('Dep. Variable:', self.dep)) left_col.append(('Model:', self.model_name)) left_col.append(('Method:', self.fit_method)) left_col.append(('Date:', self.fitted_dt.strftime('%Y-%m-%d'))) left_col.append(('Time:', self.fitted_dt.strftime('%H:%M:%S'))) left_col.append(('No. Observations:', format(self.nobs, '.0f'))) # Right column right_col = [] right_col.append(('Df. Model:', format(self.dof_model, '.0f'))) if self.dof_resid: right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f'))) if self.rsquared: right_col.append(('R2:' if html else 'R²:', format(self.rsquared, '.2f'))) else: right_col.append(('Pseudo R2:' if html else 'Pseudo R²:', format(self.pseudo_rsquared, '.2f'))) if self.f_statistic: # Report the F test if available f_result = self.ftest() if html: right_col.append(('F:', format(f_result.statistic, '.2f'))) right_col.append(('p (F):', fmt_p(f_result.pvalue, html=True, tabular=True))) else: right_col.append(('F:', format(f_result.statistic, '.2f'))) right_col.append(('p (F):', fmt_p(f_result.pvalue, html=False, tabular=True))) else: # Otherwise report likelihood ratio test as overall test lrtest_result = self.lrtest_null() right_col.append(('LL-Model:', format(self.llf, '.2f'))) right_col.append(('LL-Null:', format(self.llnull, '.2f'))) if html: right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, html=True, tabular=True))) else: right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, html=False, tabular=True))) return left_col, right_col def _repr_html_(self): # Render header table left_col, right_col = self._header_table(html=True) out = ''.format(self.full_name) for left_cell, right_cell in itertools.zip_longest(left_col, right_col): out += ''.format( left_cell[0] if left_cell else '', left_cell[1] if left_cell else '', right_cell[0] if right_cell else '', right_cell[1] if right_cell else '' ) out += '
{} Results
{}{}{}{}
' # Render coefficients table out += ''.format('exp(β)' if self.exp else 'β', (1-config.alpha)*100) for term_name, term in self.terms.items(): if isinstance(term, SingleTerm): # Single term # Exponentiate beta if requested beta = term.beta if self.exp: beta = np.exp(beta) out += ''.format(term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(term.pvalue, html=True, tabular=True)) elif isinstance(term, CategoricalTerm): # Categorical term out += ''.format(term_name) # Render reference category out += ''.format(term.ref_category) # Loop over terms for sub_term_name, sub_term in term.categories.items(): # Exponentiate beta if requested beta = sub_term.beta if self.exp: beta = np.exp(beta) out += ''.format(sub_term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(sub_term.pvalue, html=True, tabular=True)) else: raise Exception('Attempt to render unknown term type') out += '
{}({:g}% CI)p
{}{:.2f}({:.2f}{:.2f}){}
{}
{}Ref.
{}{:.2f}({:.2f}{:.2f}){}
' # TODO: Have a detailed view which shows SE, t/z, etc. return out def summary(self): # Render header table left_col, right_col = self._header_table(html=False) # Ensure equal sizes for SimpleTable if len(right_col) > len(left_col): left_col.extend([('', '')] * (len(right_col) - len(left_col))) elif len(left_col) > len(right_col): right_col.extend([('', '')] * (len(left_col) - len(right_col))) table1 = SimpleTable(np.concatenate([left_col, right_col], axis=1), title='{} Results'.format(self.full_name)) table1.insert_stubs(2, [' | '] * len(left_col)) # Get rid of last line (merge with next table) table1_lines = table1.as_text().splitlines(keepends=False) out = '\n'.join(table1_lines[:-1]) + '\n' # Render coefficients table table_data = [] for term_name, term in self.terms.items(): if isinstance(term, SingleTerm): # Single term # Exponentiate beta if requested beta = term.beta if self.exp: beta = np.exp(beta) # Add some extra padding table_data.append([term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(term.pvalue, html=False, tabular=True)]) elif isinstance(term, CategoricalTerm): # Categorical term table_data.append([term_name + ' ', '', '', '', '', '']) # Render reference category table_data.append(['{} '.format(term.ref_category), 'Ref.', '', '', '', '']) # Loop over terms for sub_term_name, sub_term in term.categories.items(): # Exponentiate beta if requested beta = sub_term.beta if self.exp: beta = np.exp(beta) table_data.append([sub_term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(sub_term.pvalue, html=False, tabular=True)]) else: raise Exception('Attempt to render unknown term type') table2 = SimpleTable(data=table_data, headers=['', 'exp(β)' if self.exp else 'β', '', '\ue000', '', ' p']) # U+E000 is in Private Use Area, mark middle of CI column table2_text = table2.as_text().replace(' \ue000 ', '({:g}% CI)'.format((1-config.alpha)*100)) # Render heading in the right spot table2_lines = table2_text.splitlines(keepends=False) # Render divider line between 2 tables max_table_len = max(len(table1_lines[-1]), len(table2_lines[-1])) out += '=' * max_table_len + '\n' out += '\n'.join(table2_lines[1:]) return out class SingleTerm: """ A term in a RegressionResult which is a single term raw_name: The raw name of the term (e.g. in RegressionResult.raw_result data) beta: An Estimate of the coefficient pvalue: The p value """ def __init__(self, raw_name, beta, pvalue): self.raw_name = raw_name self.beta = beta self.pvalue = pvalue class CategoricalTerm: """A group of terms in a RegressionResult corresponding to a categorical variable""" def __init__(self, categories, ref_category): self.categories = categories self.ref_category = ref_category def regress( model_class, df, dep, formula, *, nan_policy='warn', model_kwargs=None, fit_kwargs=None, family=None, # common model_kwargs cov_type=None, maxiter=None, start_params=None, # common fit_kwargs bool_baselevels=False, exp=None ): """ Fit a statsmodels regression model bool_baselevels: Show reference categories for boolean independent variables even if reference category is False exp: Report exponentiated parameters rather than raw parameters """ # Populate model_kwargs if model_kwargs is None: model_kwargs = {} if family is not None: model_kwargs['family'] = family # Populate fit_kwargs if fit_kwargs is None: fit_kwargs = {} if cov_type is not None: fit_kwargs['cov_type'] = cov_type if maxiter is not None: fit_kwargs['maxiter'] = maxiter if start_params is not None: fit_kwargs['start_params'] = start_params # Autodetect whether to exponentiate if exp is None: if model_class in (sm.Logit, sm.Poisson, PenalisedLogit): exp = True else: exp = False # Check for/clean NaNs df = df[[dep] + cols_for_formula(formula, df)] df = check_nan(df, nan_policy) # Ensure numeric type for dependent variable if df[dep].dtype != 'float64': df[dep] = df[dep].astype('float64') # Convert pandas nullable types for independent variables for col in df.columns: if df[col].dtype == 'Int64': df[col] = df[col].astype('float64') # Fit model model = model_class.from_formula(formula=dep + ' ~ ' + formula, data=df, **model_kwargs) result = model.fit(disp=False, **fit_kwargs) if isinstance(result, RegressionResult): # Already processed! result.exp = exp return result # Check convergence if hasattr(result, 'mle_retvals') and not result.mle_retvals['converged']: warnings.warn('Maximum likelihood estimation failed to converge. Check raw_result.mle_retvals.') # Process terms terms = {} confint = result.conf_int(config.alpha) for raw_name, raw_beta in result.params.items(): beta = Estimate(raw_beta, confint[0][raw_name], confint[1][raw_name]) # Rename terms if raw_name == 'Intercept': # Intercept term (single term) term = '(Intercept)' terms[term] = SingleTerm(raw_name, beta, result.pvalues[raw_name]) else: # Parse if required factor, column, contrast = parse_patsy_term(formula, df, raw_name) if contrast is not None: # Categorical term if bool_baselevels is False and contrast == 'True' and set(df[column].unique()) == set([True, False]): # Treat as single term terms[column] = SingleTerm(raw_name, beta, result.pvalues[raw_name]) else: # Add a new categorical term if not exists if column not in terms: ref_category = formula_factor_ref_category(formula, df, factor) terms[column] = CategoricalTerm({}, ref_category) terms[column].categories[contrast] = SingleTerm(raw_name, beta, result.pvalues[raw_name]) else: # Single term terms[column] = SingleTerm(raw_name, beta, result.pvalues[raw_name]) # Fit null model (for llnull) if hasattr(result, 'llnull'): llnull = result.llnull else: result_null = model_class.from_formula(formula=dep + ' ~ 1', data=df).fit() llnull = result_null.llf # Parse raw regression results (to get fit method) header_entries = np.vectorize(str.strip)(np.concatenate(np.split(np.array(result.summary().tables[0].data), 2, axis=1))) header_dict = {x[0]: x[1] for x in header_entries} # Get full name to display if model_class is sm.Logit: full_name = 'Logistic Regression' else: full_name = '{} Regression'.format(model_class.__name__) if fit_kwargs.get('cov_type', 'nonrobust') != 'nonrobust': full_name = 'Robust {}'.format(full_name) return RegressionResult( result, full_name, model_class.__name__, header_dict['Method:'], dep, result.nobs, result.df_model, datetime.now(), terms, result.llf, llnull, getattr(result, 'df_resid', None), getattr(result, 'rsquared', None), getattr(result, 'fvalue', None), exp ) def logit_then_regress(model_class, df, dep, formula, *, nan_policy='warn', **kwargs): """Perform logistic regression, then use parameters as start parameters for desired regression""" # Check for/clean NaNs # Do this once here so we only get 1 warning df = df[[dep] + cols_for_formula(formula, df)] df = check_nan(df, nan_policy) # Perform logistic regression logit_result = regress(sm.Logit, df, dep, formula, **kwargs) logit_params = logit_result.raw_result.params # Check convergence if not logit_result.raw_result.mle_retvals['converged']: return None # Perform desired regression return regress(model_class, df, dep, formula, start_params=logit_params, **kwargs) # ----------------------------- # Penalised logistic regression class PenalisedLogit(statsmodels.discrete.discrete_model.BinaryModel): """ Statsmodel-compatible model for computing Firth penalised logistic regression Uses R "logistf" library NB: This class expects to be used in the context of yli.regress() """ def fit(self): import rpy2.robjects as ro import rpy2.robjects.packages import rpy2.robjects.pandas2ri # Assume data is already cleaned from regress() df = self.data.frame.copy() # Convert bool to int otherwise rpy2 chokes df = df.replace({False: 0, True: 1}) # Import logistf ro.packages.importr('logistf') with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter): with ro.local_context() as lc: # Convert DataFrame to R lc['df'] = df # Transfer other parameters to R lc['formula_'] = self.formula lc['alpha'] = config.alpha # Fit the model model = ro.r('logistf(formula_, data=df, alpha=alpha)') # TODO: Handle categorical terms? terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])} return RegressionResult( model, 'Penalised Logistic Regression', 'Logit', 'Penalised ML', self.endog_names, model['n'][0], model['df'][0], datetime.now(), terms, model['loglik'][0], model['loglik'][1], None, None, None, None # Set exp in regress() )