# scipy-yli: Helpful SciPy utilities and recipes # Copyright © 2022 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import numpy as np import pandas as pd import patsy from scipy import stats from scipy.special import expit import statsmodels, statsmodels.miscmodels.ordinal_model import statsmodels.api as sm from statsmodels.iolib.table import SimpleTable from statsmodels.stats.outliers_influence import variance_inflation_factor from tqdm import tqdm from datetime import datetime import itertools import warnings from .bayes_factors import BayesFactor, bayesfactor_afbf from .config import config from .sig_tests import FTestResult from .utils import Estimate, PValueStyle, check_nan, cols_for_formula, convert_pandas_nullable, fmt_p, formula_factor_ref_category, parse_patsy_term def vif(df, formula=None, *, nan_policy='warn'): """ Calculate the variance inflation factor (VIF) for each variable in *df* :param df: Data to calculate the VIF for :type df: DataFrame :param formula: If specified, calculate the VIF only for the variables in the formula :type formula: str :return: The variance inflation factors :rtype: Series **Example:** .. code-block:: df = pd.DataFrame({ 'D': [68.58, 67.33, 67.33, ...], 'T1': [14, 10, 10, ...], 'T2': [46, 73, 85, ...], ... }) yli.vif(df[['D', 'T1', 'T2', ...]]) .. code-block:: text D 8.318301 T1 6.081590 T2 2.457122 ... dtype: float64 The output shows the variance inflation factor for each variable in *df*. """ if formula: # Only consider columns in the formula df = df[cols_for_formula(formula, df)] # Check for/clean NaNs df = check_nan(df, nan_policy) # Convert all to float64 otherwise statsmodels chokes with "ufunc 'isfinite' not supported for the input types ..." df = pd.get_dummies(df, drop_first=True) # Convert categorical dtypes df = df.astype('float64') # Convert all other dtypes # Add intercept column orig_columns = list(df.columns) df['Intercept'] = [1] * len(df) vifs = {} for i, col in enumerate(orig_columns): vifs[col] = variance_inflation_factor(df, i) return pd.Series(vifs) # ---------- # Regression class LikelihoodRatioTestResult: """ Result of a likelihood ratio test for regression See :meth:`RegressionResult.lrtest_null`. """ def __init__(self, statistic, dof, pvalue): #: Likelihood ratio test statistic (*float*) self.statistic = statistic #: Degrees of freedom for the likelihood ratio test statistic (*int*) self.dof = dof #: *p* value for the likelihood ratio test (*float*) self.pvalue = pvalue def __repr__(self): if config.repr_is_summary: return self.summary() return super().__repr__() def _repr_html_(self): return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION | PValueStyle.HTML)) def summary(self): """ Return a stringified summary of the likelihood ratio test :rtype: str """ return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION)) class RegressionResult: """ Result of a regression See :func:`yli.regress`. """ def __init__(self, raw_result, full_name, model_name, fit_method, dep, nobs, dof_model, fitted_dt, cov_type, terms, llf, llnull, dof_resid, rsquared, f_statistic, comments, exp ): #: Raw result from statsmodels *model.fit* self.raw_result = raw_result # Information for display #: Full name of the regression model type (*str*) self.full_name = full_name #: Short name of the regression model type (*str*) self.model_name = model_name #: Method for fitting the regression model (*str*) self.fit_method = fit_method # Basic fitted model information #: Name of the dependent variable (*str*) self.dep = dep #: Number of observations (*int*) self.nobs = nobs #: Degrees of freedom for the model (*int*) self.dof_model = dof_model #: Date and time of fitting the model (Python *datetime*) self.fitted_dt = fitted_dt #: Method for computing the covariance matrix (*str*) self.cov_type = cov_type # Regression coefficients/p values #: Coefficients and *p* values for each term in the model (*dict* of :class:`SingleTerm` or :class:`CategoricalTerm`) self.terms = terms # Model log-likelihood #: Log-likelihood of fitted model (*float*) self.llf = llf #: Log-likelihood of null model (*float*) self.llnull = llnull # Extra statistics (not all regression models have these) #: Degrees of freedom for the residuals (*int*; *None* if N/A) self.dof_resid = dof_resid #: *R*:sup:`2` statistic (*float*; *None* if N/A) self.rsquared = rsquared #: *F* statistic (*float*; *None* if N/A) self.f_statistic = f_statistic #: Comments for the model (*List[str]*) self.comments = comments or [] # Config for display style #: See :func:`yli.regress` self.exp = exp @property def pseudo_rsquared(self): """McFadden's pseudo *R*:sup:`2` statistic""" return 1 - self.llf/self.llnull def lrtest_null(self): """ Compute the likelihood ratio test comparing the model to the null model :rtype: :class:`LikelihoodRatioTestResult` """ statistic = -2 * (self.llnull - self.llf) pvalue = 1 - stats.chi2.cdf(statistic, self.dof_model) return LikelihoodRatioTestResult(statistic, self.dof_model, pvalue) def ftest(self): """ Perform the *F* test that all slopes are 0 :rtype: :class:`yli.sig_tests.FTestResult` """ pvalue = 1 - stats.f(self.dof_model, self.dof_resid).cdf(self.f_statistic) return FTestResult(self.f_statistic, self.dof_model, self.dof_resid, pvalue) def bayesfactor_beta_zero(self, term): """ Compute a Bayes factor testing the hypothesis that the given beta is 0 Uses the R *BFpack* library. Requires the regression to be from statsmodels. The term must be specified as the *raw name* from the statsmodels regression, available via :attr:`RegressionResult.raw_result`. :param term: Raw name of the term to be tested :type term: str :rtype: :class:`yli.bayes_factors.BayesFactor` """ # FIXME: Allow specifying our renamed terms # Get parameters required for AFBF params = pd.Series({raw_name.replace('[', '_').replace(']', '_'): beta for raw_name, beta in self.raw_result.params.items()}) cov = self.raw_result.cov_params() # Compute BF matrix bf01 = bayesfactor_afbf(params, cov, self.nobs, '{} = 0'.format(term.replace('[', '_').replace(']', '_'))) bf01 = BayesFactor(bf01.factor, '0', '{} = 0'.format(term), '1', '{} ≠ 0'.format(term)) if bf01.factor >= 1: return bf01 else: return bf01.invert() def _header_table(self, html): """Return the entries for the header table""" # Left column left_col = [] left_col.append(('Dep. Variable:', self.dep)) left_col.append(('Model:', self.model_name)) left_col.append(('Method:', self.fit_method)) left_col.append(('Date:', self.fitted_dt.strftime('%Y-%m-%d'))) left_col.append(('Time:', self.fitted_dt.strftime('%H:%M:%S'))) if self.cov_type: left_col.append(('Std. Errors:', 'Non-Robust' if self.cov_type == 'nonrobust' else self.cov_type.upper() if self.cov_type.startswith('hc') else self.cov_type)) # Right column right_col = [] right_col.append(('No. Observations:', format(self.nobs, '.0f'))) right_col.append(('Df. Model:', format(self.dof_model, '.0f'))) if self.dof_resid: right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f'))) if self.rsquared: right_col.append(('R2:' if html else 'R²:', format(self.rsquared, '.2f'))) else: right_col.append(('Pseudo R2:' if html else 'Pseudo R²:', format(self.pseudo_rsquared, '.2f'))) if self.f_statistic: # Report the F test if available f_result = self.ftest() if html: right_col.append(('F:', format(f_result.statistic, '.2f'))) right_col.append(('p (F):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML))) else: right_col.append(('F:', format(f_result.statistic, '.2f'))) right_col.append(('p (F):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY))) else: # Otherwise report likelihood ratio test as overall test lrtest_result = self.lrtest_null() right_col.append(('LL-Model:', format(self.llf, '.2f'))) right_col.append(('LL-Null:', format(self.llnull, '.2f'))) if html: right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML))) else: right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY))) return left_col, right_col def __repr__(self): if config.repr_is_summary: return self.summary() return super().__repr__() def _repr_html_(self): # Render header table left_col, right_col = self._header_table(html=True) out = ''.format(self.full_name) for left_cell, right_cell in itertools.zip_longest(left_col, right_col): out += ''.format( left_cell[0] if left_cell else '', left_cell[1] if left_cell else '', right_cell[0] if right_cell else '', right_cell[1] if right_cell else '' ) out += '
{} Results
{}{}{}{}
' # Render coefficients table out += ''.format('exp(β)' if self.exp else 'β', (1-config.alpha)*100) for term_name, term in self.terms.items(): if isinstance(term, SingleTerm): # Single term # Exponentiate beta if requested beta = term.beta if self.exp: beta = np.exp(beta) out += ''.format(term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML)) elif isinstance(term, CategoricalTerm): # Categorical term out += ''.format(term_name) # Render reference category if term.ref_category is not None: out += ''.format(term.ref_category) # Loop over terms for sub_term_name, sub_term in term.categories.items(): # Exponentiate beta if requested beta = sub_term.beta if self.exp: beta = np.exp(beta) out += ''.format(sub_term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(sub_term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML)) else: raise Exception('Attempt to render unknown term type') out += '
{}({:g}% CI)p
{}{:.2f}({:.2f}{:.2f}){}
{}
{}Ref.
{}{:.2f}({:.2f}{:.2f}){}
' # TODO: Have a detailed view which shows SE, t/z, etc. if self.comments: out += '
    ' for comment in self.comments: out += '
  1. {}
  2. '.format(comment) out += '
' return out def summary(self): """ Return a stringified summary of the regression model :rtype: str """ # Render header table left_col, right_col = self._header_table(html=False) # Ensure equal sizes for SimpleTable if len(right_col) > len(left_col): left_col.extend([('', '')] * (len(right_col) - len(left_col))) elif len(left_col) > len(right_col): right_col.extend([('', '')] * (len(left_col) - len(right_col))) table1 = SimpleTable(np.concatenate([left_col, right_col], axis=1), title='{} Results'.format(self.full_name)) table1.insert_stubs(2, [' | '] * len(left_col)) # Get rid of last line (merge with next table) table1_lines = table1.as_text().splitlines(keepends=False) out = '\n'.join(table1_lines[:-1]) + '\n' # Render coefficients table table_data = [] for term_name, term in self.terms.items(): if isinstance(term, SingleTerm): # Single term # Exponentiate beta if requested beta = term.beta if self.exp: beta = np.exp(beta) # Add some extra padding table_data.append([term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(term.pvalue, PValueStyle.TABULAR)]) elif isinstance(term, CategoricalTerm): # Categorical term table_data.append([term_name + ' ', '', '', '', '', '']) # Render reference category if term.ref_category is not None: table_data.append(['{} '.format(term.ref_category), 'Ref.', '', '', '', '']) # Loop over terms for sub_term_name, sub_term in term.categories.items(): # Exponentiate beta if requested beta = sub_term.beta if self.exp: beta = np.exp(beta) table_data.append([sub_term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(sub_term.pvalue, PValueStyle.TABULAR)]) else: raise Exception('Attempt to render unknown term type') table2 = SimpleTable(data=table_data, headers=['', 'exp(β)' if self.exp else 'β', '', '\ue000', '', ' p']) # U+E000 is in Private Use Area, mark middle of CI column table2_text = table2.as_text().replace(' \ue000 ', '({:g}% CI)'.format((1-config.alpha)*100)) # Render heading in the right spot table2_lines = table2_text.splitlines(keepends=False) # Render divider line between 2 tables max_table_len = max(len(table1_lines[-1]), len(table2_lines[-1])) out += '=' * max_table_len + '\n' out += '\n'.join(table2_lines[1:]) if self.comments: out += '\n' for i, comment in enumerate(self.comments): out += '\n{}. {}'.format(i + 1, comment) return out class SingleTerm: """A term in a :class:`RegressionResult` which is a single term""" def __init__(self, raw_name, beta, pvalue): #: Raw name of the term (*str*; e.g. in :attr:`RegressionResult.raw_result`) self.raw_name = raw_name #: :class:`yli.utils.Estimate` of the coefficient self.beta = beta #: *p* value for the coefficient (*float*) self.pvalue = pvalue class CategoricalTerm: """A group of terms in a :class:`RegressionResult` corresponding to a categorical variable""" def __init__(self, categories, ref_category): #: Terms for each of the categories, excluding the reference category (*dict* of :class:`SingleTerm`) self.categories = categories #: Name of the reference category (*str*) self.ref_category = ref_category def regress( model_class, df, dep, formula, *, nan_policy='warn', model_kwargs=None, fit_kwargs=None, family=None, # common model_kwargs cov_type=None, method=None, maxiter=None, start_params=None, # common fit_kwargs bool_baselevels=False, exp=None, _dmatrices=None, ): """ Fit a statsmodels regression model :param model_class: Type of regression model to fit :type model_class: statsmodels model class :param df: Data to perform regression on :type df: DataFrame :param dep: Column in *df* for the dependent variable (numeric) :type dep: str :param formula: Patsy formula for the regression model :type formula: str :param nan_policy: How to handle *nan* values (see :ref:`nan-handling`) :type nan_policy: str :param model_kwargs: Keyword arguments to pass to *model_class* constructor :type model_kwargs: dict :param fit_kwargs: Keyword arguments to pass to *model.fit* :type fit_kwargs: dict :param family: See statsmodels *GLM* constructor :param cov_type: See statsmodels *model.fit* :param method: See statsmodels *model.fit* :param maxiter: See statsmodels *model.fit* :param start_params: See statsmodels *model.fit* :param bool_baselevels: Show reference categories for boolean independent variables even if reference category is *False* :type bool_baselevels: bool :param exp: Report exponentiated parameters rather than raw parameters, default (*None*) is to autodetect based on *model_class* :type exp: bool :rtype: :class:`yli.regress.RegressionResult` **Example:** .. code-block:: df = pd.DataFrame({ 'Unhealthy': [False, False, False, ...], 'Fibrinogen': [2.52, 2.46, 2.29, ...], 'GammaGlobulin': [38, 36, 36, ...] }) yli.regress(sm.Logit, df, 'Unhealthy', 'Fibrinogen + GammaGlobulin') .. code-block:: text Logistic Regression Results ====================================================== Dep. Variable: Unhealthy | No. Observations: 32 Model: Logit | Df. Model: 2 Method: MLE | Df. Residuals: 29 Date: 2022-10-18 | Pseudo R²: 0.26 Time: 19:00:34 | LL-Model: -11.47 Std. Errors: Non-Robust | LL-Null: -15.44 | p (LR): 0.02* ====================================================== exp(β) (95% CI) p ----------------------------------------------- (Intercept) 0.00 (0.00 - 0.24) 0.03* Fibrinogen 6.80 (1.01 - 45.79) 0.049* GammaGlobulin 1.17 (0.92 - 1.48) 0.19 ----------------------------------------------- The output summarises the results of the regression. Note that the parameter estimates are automatically exponentiated. For example, the odds ratio for unhealthiness per unit increase in fibrinogen is 6.80, with 95% confidence interval 1.01–45.79, and is significant with *p* value 0.049. """ # Populate model_kwargs if model_kwargs is None: model_kwargs = {} if family is not None: model_kwargs['family'] = family # Populate fit_kwargs if fit_kwargs is None: fit_kwargs = {} if cov_type is not None: fit_kwargs['cov_type'] = cov_type if method is not None: fit_kwargs['method'] = method if maxiter is not None: fit_kwargs['maxiter'] = maxiter if start_params is not None: fit_kwargs['start_params'] = start_params # Autodetect whether to exponentiate if exp is None: if model_class in (sm.Logit, sm.Poisson, PenalisedLogit): exp = True elif model_class is OrdinalLogit: exp = True else: exp = False if _dmatrices is None: # Check for/clean NaNs df = df[[dep] + cols_for_formula(formula, df)] df = check_nan(df, nan_policy) # Ensure numeric type for dependent variable if df[dep].dtype != 'float64': df[dep] = df[dep].astype('float64') # Convert pandas nullable types for independent variables as this breaks statsmodels df = convert_pandas_nullable(df) # Construct design matrix from formula dmatrices = patsy.dmatrices(dep + ' ~ ' + formula, df, return_type='dataframe') else: dmatrices = _dmatrices if model_class is OrdinalLogit: # Drop explicit intercept term # FIXME: Check before dropping dmatrices = (dmatrices[0], dmatrices[1].iloc[:,1:]) # Fit model model = model_class(endog=dmatrices[0], exog=dmatrices[1], **model_kwargs) model.formula = dep + ' ~ ' + formula result = model.fit(disp=False, **fit_kwargs) if isinstance(result, RegressionResult): # Already processed! result.exp = exp return result # Check convergence if hasattr(result, 'mle_retvals') and not result.mle_retvals['converged']: warnings.warn('Maximum likelihood estimation failed to converge. Check raw_result.mle_retvals.') # Process terms terms = {} # Join term names manually because statsmodels wrapper is very slow #confint = result.conf_int(config.alpha) confint = {k: v for k, v in zip(model.exog_names, result._results.conf_int(config.alpha))} pvalues = {k: v for k, v in zip(model.exog_names, result._results.pvalues)} #for raw_name, raw_beta in result.params.items(): for raw_name, raw_beta in zip(model.exog_names, result._results.params): beta = Estimate(raw_beta, confint[raw_name][0], confint[raw_name][1]) # Rename terms if raw_name == 'Intercept': # Intercept term (single term) term = '(Intercept)' terms[term] = SingleTerm(raw_name, beta, pvalues[raw_name]) elif model_class is OrdinalLogit and '/' in raw_name: # Group ordinal regression cutoffs if '(Cutoffs)' not in terms: terms['(Cutoffs)'] = CategoricalTerm({}, None) terms['(Cutoffs)'].categories[raw_name] = SingleTerm(raw_name, beta, pvalues[raw_name]) else: # Parse if required factor, column, contrast = parse_patsy_term(formula, df, raw_name) if contrast is not None: # Categorical term if bool_baselevels is False and contrast == 'True' and set(df[column].unique()) == set([True, False]): # Treat as single term terms[column] = SingleTerm(raw_name, beta, pvalues[raw_name]) else: # Add a new categorical term if not exists if column not in terms: ref_category = formula_factor_ref_category(formula, df, factor) terms[column] = CategoricalTerm({}, ref_category) terms[column].categories[contrast] = SingleTerm(raw_name, beta, pvalues[raw_name]) else: # Single term terms[column] = SingleTerm(raw_name, beta, pvalues[raw_name]) # Fit null model (for llnull) if hasattr(result, 'llnull'): llnull = result.llnull else: # Construct null (intercept-only) model #result_null = model_class.from_formula(formula=dep + ' ~ 1', data=df).fit() dm_exog = pd.DataFrame(index=dmatrices[0].index) dm_exog['Intercept'] = pd.Series(dtype='float64') dm_exog['Intercept'].fillna(1, inplace=True) result_null = model_class(endog=dmatrices[0], exog=dm_exog).fit() llnull = result_null.llf if model_class is sm.OLS: method_name = 'Least Squares' else: # Parse raw regression results to get fit method # Avoid this in general as it can be expensive to summarise all the post hoc tests, etc. header_entries = np.vectorize(str.strip)(np.concatenate(np.split(np.array(result.summary().tables[0].data), 2, axis=1))) header_dict = {x[0]: x[1] for x in header_entries} method_name = header_dict['Method:'] # Get full name to display if model_class is sm.Logit: full_name = 'Logistic Regression' else: full_name = '{} Regression'.format(model_class.__name__) if fit_kwargs.get('cov_type', 'nonrobust') != 'nonrobust': full_name = 'Robust {}'.format(full_name) return RegressionResult( result, full_name, model_class.__name__, method_name, dep, result.nobs, result.df_model, datetime.now(), getattr(result, 'cov_type', 'nonrobust'), terms, result.llf, llnull, getattr(result, 'df_resid', None), getattr(result, 'rsquared', None), getattr(result, 'fvalue', None), [], exp ) def regress_bootstrap( model_class, df, dep, formula, *, nan_policy='warn', model_kwargs=None, fit_kwargs=None, samples=1000, **kwargs ): """ Fit a statsmodels regression model, using bootstrapping to compute confidence intervals and *p* values :param model_class: See :func:`yli.regress` :param df: See :func:`yli.regress` :param dep: See :func:`yli.regress` :param formula: See :func:`yli.regress` :param nan_policy: See :func:`yli.regress` :param model_kwargs: See :func:`yli.regress` :param fit_kwargs: See :func:`yli.regress` :param samples: Number of bootstrap samples to draw :type samples: int :param kwargs: Other arguments to pass to :func:`yli.regress` :rtype: :class:`yli.regress.RegressionResult` """ if model_kwargs is None: model_kwargs = {} if fit_kwargs is None: fit_kwargs = {} # Check for/clean NaNs # Following this, we pass nan_policy='raise' to assert no NaNs remaining df = df[[dep] + cols_for_formula(formula, df)] df = check_nan(df, nan_policy) # Ensure numeric type for dependent variable if df[dep].dtype != 'float64': df[dep] = df[dep].astype('float64') # Convert pandas nullable types for independent variables as this breaks statsmodels df = convert_pandas_nullable(df) # Precompute design matrices dmatrices = patsy.dmatrices(dep + ' ~ ' + formula, df, return_type='dataframe') # Fit full model full_model = regress(model_class, df, dep, formula, nan_policy='raise', _dmatrices=dmatrices, model_kwargs=model_kwargs, fit_kwargs=fit_kwargs, **kwargs) # Initialise bootstrap_results bootstrap_results = {} # Dict mapping term raw names to bootstrap betas for term in full_model.terms.values(): if isinstance(term, SingleTerm): bootstrap_results[term.raw_name] = [] else: for sub_term in term.categories.values(): bootstrap_results[sub_term.raw_name] = [] # Draw bootstrap samples and regress dmatrices = dmatrices[0].join(dmatrices[1]) for i in tqdm(range(samples)): bootstrap_rows = dmatrices.sample(len(df), replace=True) model = model_class(endog=bootstrap_rows.iloc[:,0], exog=bootstrap_rows.iloc[:,1:], **model_kwargs) model.formula = dep + ' ~ ' + formula result = model.fit(disp=False, **fit_kwargs) for raw_name, raw_beta in zip(model.exog_names, result._results.params): bootstrap_results[raw_name].append(raw_beta) # Combine bootstrap results terms = {} for term_name, term in full_model.terms.items(): if isinstance(term, SingleTerm): bootstrap_betas = bootstrap_results[term.raw_name] bootstrap_pvalue = sum(1 for b in bootstrap_betas if b < 0) / len(bootstrap_betas) bootstrap_pvalue = 2 * min(bootstrap_pvalue, 1 - bootstrap_pvalue) terms[term_name] = SingleTerm(term.raw_name, Estimate(term.beta.point, np.quantile(bootstrap_betas, config.alpha/2), np.quantile(bootstrap_betas, 1-config.alpha/2)), bootstrap_pvalue) else: categories = {} for sub_term_name, sub_term in term.categories.items(): bootstrap_betas = bootstrap_results[sub_term.raw_name] bootstrap_pvalue = sum(1 for b in bootstrap_betas if b < 0) / len(bootstrap_betas) bootstrap_pvalue = 2 * min(bootstrap_pvalue, 1 - bootstrap_pvalue) categories[sub_term_name] = SingleTerm(sub_term.raw_name, Estimate(sub_term.beta.point, np.quantile(bootstrap_betas, config.alpha/2), np.quantile(bootstrap_betas, 1-config.alpha/2)), bootstrap_pvalue) terms[term_name] = CategoricalTerm(categories, term.ref_category) return RegressionResult( None, full_model.full_name, full_model.model_name, full_model.fit_method, dep, full_model.nobs, full_model.dof_model, datetime.now(), 'Bootstrap', terms, full_model.llf, full_model.llnull, full_model.dof_resid, full_model.rsquared, full_model.f_statistic, full_model.comments, full_model.exp ) def logit_then_regress(model_class, df, dep, formula, *, nan_policy='warn', **kwargs): """ Perform logistic regression, then use parameters as start parameters for desired regression :param model_class: Type of regression model to fit :type model_class: statsmodels model class :param df: Data to perform regression on :type df: DataFrame :param dep: Column in *df* for the dependent variable (numeric) :type dep: str :param formula: Patsy formula for the regression model :type formula: str :param nan_policy: How to handle *nan* values (see :ref:`nan-handling`) :type nan_policy: str :param kwargs: Passed through to :func:`yli.regress` :rtype: :class:`yli.regress.RegressionResult` """ # Check for/clean NaNs # Do this once here so we only get 1 warning df = df[[dep] + cols_for_formula(formula, df)] df = check_nan(df, nan_policy) # Perform logistic regression logit_result = regress(sm.Logit, df, dep, formula, **kwargs) logit_params = logit_result.raw_result.params # Check convergence if not logit_result.raw_result.mle_retvals['converged']: return None # Perform desired regression return regress(model_class, df, dep, formula, start_params=logit_params, **kwargs) # ----------------------------- # Penalised logistic regression class PenalisedLogit(statsmodels.discrete.discrete_model.BinaryModel): """ statsmodels-compatible model for computing Firth penalised logistic regression Uses the R *logistf* library. This class should be used in conjunction with :func:`yli.regress`. **Example:** .. code-block:: df = pd.DataFrame({ 'Pred': [1] * 20 + [0] * 220, 'Outcome': [1] * 40 + [0] * 200 }) yli.regress(yli.PenalisedLogit, df, 'Outcome', 'Pred', exp=False) .. code-block:: text Penalised Logistic Regression Results ========================================================= Dep. Variable: Outcome | No. Observations: 240 Model: Logit | Df. Model: 1 Method: Penalised ML | Pseudo R²: 0.37 Date: 2022-10-19 | LL-Model: -66.43 Time: 07:50:40 | LL-Null: -105.91 Std. Errors: Non-Robust | p (LR): <0.001* ========================================================= β (95% CI) p --------------------------------------------- (Intercept) -2.28 (-2.77 - -1.85) <0.001* Pred 5.99 (3.95 - 10.85) <0.001* --------------------------------------------- The output summarises the result of the regression. The summary table shows that a penalised method was applied. Note that because `exp=False` was passed, the parameter estimates are not automatically exponentiated. """ def fit(self, disp=False): import rpy2.robjects as ro import rpy2.robjects.packages import rpy2.robjects.pandas2ri # Assume data is already cleaned from regress() df = self.data.orig_endog.join(self.data.orig_exog) # Convert bool to int otherwise rpy2 chokes df = df.replace({False: 0, True: 1}) # Import logistf ro.packages.importr('logistf') with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter): with ro.local_context() as lc: # Convert DataFrame to R lc['df'] = df # Transfer other parameters to R lc['formula_'] = self.formula lc['alpha'] = config.alpha # Fit the model model = ro.r('logistf(formula_, data=df, alpha=alpha)') # TODO: Handle categorical terms? terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])} return RegressionResult( model, 'Penalised Logistic Regression', 'Logit', 'Penalised ML', self.endog_names, model['n'][0], model['df'][0], datetime.now(), 'nonrobust', terms, model['loglik'][0], model['loglik'][1], None, None, None, [], None # Set exp in regress() ) # ------------------------------------------------------ # Ordinal logistic regression (R/Stata parameterisation) class OrdinalLogit(statsmodels.miscmodels.ordinal_model.OrderedModel): """ statsmodels-compatible model for computing ordinal logistic (or probit) regression The implementation subclasses statsmodels' native *OrderedModel*, but substitutes an alternative parameterisation used by R and Stata. The the native statsmodels implementation, the first cutoff term is the true cutoff, and further cutoff terms are log differences between consecutive cutoffs. In this parameterisation, cutoff terms are represented directly in the model. """ def __init__(self, endog, exog, **kwargs): if 'distr' not in kwargs: kwargs['distr'] = 'logit' super().__init__(endog, exog, **kwargs) def transform_threshold_params(self, params): th_params = params[-(self.k_levels - 1):] thresh = np.concatenate(([-np.inf], th_params, [np.inf])) return thresh def transform_reverse_threshold_params(self, params): return params[:-1]