# scipy-yli: Helpful SciPy utilities and recipes # Copyright © 2022–2023 Lee Yingtong Li (RunasSudo) # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import numpy as np import pandas as pd import patsy from scipy import stats from scipy.special import expit import statsmodels, statsmodels.base.model, statsmodels.miscmodels.ordinal_model import statsmodels.api as sm from statsmodels.iolib.table import SimpleTable from statsmodels.stats.outliers_influence import variance_inflation_factor from datetime import datetime import io import itertools import json import subprocess import weakref from .bayes_factors import BayesFactor, bayesfactor_afbf from .config import config from .shap import ShapResult from .sig_tests import ChiSquaredResult, FTestResult from .utils import Estimate, PValueStyle, as_numeric, check_nan, cols_for_formula, convert_pandas_nullable, fmt_p, formula_factor_ref_category, parse_patsy_term # TODO: Documentation # TODO: Bootstrap def vif(df, formula=None, *, nan_policy='warn'): """ Calculate the variance inflation factor (VIF) for each variable in *df* :param df: Data to calculate the VIF for :type df: DataFrame :param formula: If specified, calculate the VIF only for the variables in the formula :type formula: str :return: The variance inflation factors :rtype: Series **Example:** .. code-block:: df = pd.DataFrame({ 'D': [68.58, 67.33, 67.33, ...], 'T1': [14, 10, 10, ...], 'T2': [46, 73, 85, ...], ... }) yli.vif(df[['D', 'T1', 'T2', ...]]) .. code-block:: text D 8.318301 T1 6.081590 T2 2.457122 ... dtype: float64 The output shows the variance inflation factor for each variable in *df*. """ if formula: # Only consider columns in the formula df = df[cols_for_formula(formula, df)] # Check for/clean NaNs df = check_nan(df, nan_policy) # Convert all to float64 otherwise statsmodels chokes with "ufunc 'isfinite' not supported for the input types ..." df = pd.get_dummies(df, drop_first=True) # Convert categorical dtypes df = df.astype('float64') # Convert all other dtypes # Add intercept column orig_columns = list(df.columns) df['Intercept'] = [1] * len(df) vifs = {} for i, col in enumerate(orig_columns): vifs[col] = variance_inflation_factor(df, i) return pd.Series(vifs) def regress( model_class, df, dep, formula, *, nan_policy='warn', exposure=None, family=None, status=None, method=None, maxiter=None, start_params=None, tolerance=None, reduced=None, bool_baselevels=False, exp=None ): """ Fit a statsmodels regression model :param model_class: Type of regression model to fit :type model_class: :class:`yli.regress.RegressionModel` subclass :param df: Data to perform regression on :type df: DataFrame :param dep: Column(s) in *df* for the dependent variable (numeric) :type dep: str :param formula: Patsy formula for the regression model :type formula: str :param nan_policy: How to handle *nan* values (see :ref:`nan-handling`) :type nan_policy: str :param exposure: Column in *df* for the exposure variable (numeric, some models only) :type exposure: str :param family: See :class:`yli.GLM` :type family: str :param status: See :class:`yli.Cox` :type status: str :param method: See statsmodels *model.fit* :param maxiter: See statsmodels *model.fit* :param start_params: See statsmodels *model.fit* :param tolerance: See statsmodels *model.fit* :param reduced: See :meth:`yli.IntervalCensoredCox` :param bool_baselevels: Show reference categories for boolean independent variables even if reference category is *False* :type bool_baselevels: bool :param exp: Report exponentiated parameters rather than raw parameters, default (*None*) is to autodetect based on *model_class* :type exp: bool :rtype: :class:`yli.regress.RegressionModel` **Example:** See :class:`yli.OLS`, :class:`yli.Logit`, etc. """ if not any(x.__name__ == 'RegressionModel' for x in model_class.__bases__): raise ValueError('model_class must be a RegressionModel') # Additional columns to check for NaN - exposure, status, etc. additional_columns = [] # Build function call arguments fit_kwargs = {} if exposure is not None: additional_columns.append(exposure) if family is not None: fit_kwargs['family'] = family if status is not None: additional_columns.append(status) if method is not None: fit_kwargs['method'] = method if maxiter is not None: fit_kwargs['maxiter'] = maxiter if start_params is not None: fit_kwargs['start_params'] = start_params if tolerance is not None: fit_kwargs['tolerance'] = tolerance if reduced is not None: fit_kwargs['reduced'] = reduced # Bodge for TimeVaryingCox if model_class.__name__ == 'TimeVaryingCox': additional_columns.append('index') additional_columns.append('start') additional_columns.append('stop') # Preprocess data, check for NaN and get design matrices df_ref = weakref.ref(df) df_clean, dmatrices, dep_categories = df_to_dmatrices(df, dep, formula, nan_policy, additional_columns) # Add function call arguments for supplemental columns - exposure, status, etc. if exposure is not None: fit_kwargs['exposure'] = df_clean[exposure] if status is not None: fit_kwargs['status'] = df_clean[status] # Bodge for TimeVaryingCox if model_class.__name__ == 'TimeVaryingCox': dmatrices = (dmatrices[0], dmatrices[1].join(df_clean[['index', 'start', 'stop']])) # Fit model result = model_class.fit(dmatrices[0], dmatrices[1], **fit_kwargs) # Fill in general information result.df = df_ref result.dep = dep result.formula = formula result.nan_policy = nan_policy if exp is not None: result.exp = exp result.fitted_dt = datetime.now() result.nobs = len(dmatrices[0]) # Process categorical terms, etc. raw_terms = result.terms result.terms = {} for raw_name, raw_term in raw_terms.items(): # Rename terms if raw_name == '(Intercept)': # Already processed result.terms[raw_name] = raw_term elif raw_name == '(Cutoffs)': result.terms[raw_name] = raw_term if dep_categories is not None: # Need to convert factorised names back into original names old_cutoffs = raw_term.categories raw_term.categories = {} for cutoff_raw_name, cutoff_term in old_cutoffs.items(): bits = cutoff_raw_name.split('/') term = dep_categories[round(float(bits[0]))] + '/' + dep_categories[round(float(bits[1]))] result.terms[raw_name].categories[term] = cutoff_term else: # Parse if required factor, column, contrast = parse_patsy_term(formula, df_clean, raw_name) if contrast is not None: # Categorical term if bool_baselevels is False and contrast == 'True' and set(df[column].unique()) == set([True, False]): # Treat as single term result.terms[column] = raw_term else: # Add a new categorical term if not exists if column not in result.terms: ref_category = formula_factor_ref_category(formula, df_clean, factor) result.terms[column] = CategoricalTerm({}, ref_category) result.terms[column].categories[contrast] = raw_term else: # Single term result.terms[column] = raw_term return result def df_to_dmatrices(df, dep, formula, nan_policy, additional_columns=[]): # Check for/clean NaNs in input columns columns = cols_for_formula(dep, df) + cols_for_formula(formula, df) if additional_columns: columns += additional_columns df = df[columns] df = check_nan(df, nan_policy) # Ensure numeric type for dependent variable if '+' in dep: dep_categories = None else: df[dep], dep_categories = as_numeric(df[dep]) # Convert pandas nullable types for independent variables as this breaks statsmodels df = convert_pandas_nullable(df) # Construct design matrix from formula dmatrices = patsy.dmatrices(dep + ' ~ ' + formula, df, return_type='dataframe') return df, dmatrices, dep_categories class RegressionModel: # TODO: Documentation def __init__(self): # Model configuration (all set in yli.regress) self.df = None self.dep = None self.formula = None self.nan_policy = None self.exp = False self.cov_type = None # Summary information self.fitted_dt = None # Set in yli.regress self.nobs = None # Set in yli.regress self.nevents = None self.dof_model = None self.dof_resid = None self.rsquared = None self.ll_model = None self.ll_null = None self.ll_saturated = None self.f_statistic = None # Parameters self.terms = None self.vcov = None # Comments self.comments = [] @property def model_long_name(self): return None @property def model_short_name(self): return None def _header_table(self, html): """Return the entries for the header table""" # Left column left_col = [] left_col.append(('Dep. Variable:', self.dep)) left_col.append(('Model:', self.model_short_name)) left_col.append(('Date:', self.fitted_dt.strftime('%Y-%m-%d'))) left_col.append(('Time:', self.fitted_dt.strftime('%H:%M:%S'))) if self.cov_type: left_col.append(('Std. Errors:', 'Non-Robust' if self.cov_type == 'nonrobust' else self.cov_type.upper() if self.cov_type.startswith('hc') else self.cov_type)) # Right column right_col = [] right_col.append(('No. Observations:', format(self.nobs, '.0f'))) if self.nevents: right_col.append(('No. Events:', format(self.nevents, '.0f'))) if self.dof_model: right_col.append(('Df. Model:', format(self.dof_model, '.0f'))) if self.dof_resid: right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f'))) if self.rsquared: right_col.append(('R2:' if html else 'R²:', format(self.rsquared, '.2f'))) elif self.ll_null: right_col.append(('Pseudo R2:' if html else 'Pseudo R²:', format(self.pseudo_rsquared, '.2f'))) if self.f_statistic: # Report the F test if available f_result = self.ftest() if html: right_col.append(('F:', format(f_result.statistic, '.2f'))) right_col.append(('p (F):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML))) else: right_col.append(('F:', format(f_result.statistic, '.2f'))) right_col.append(('p (F):', fmt_p(f_result.pvalue, PValueStyle.VALUE_ONLY))) else: # Otherwise report likelihood ratio test as overall test right_col.append(('LL-Model:', format(self.ll_model, '.2f'))) if self.ll_null: lrtest_result = self.lrtest_null() right_col.append(('LL-Null:', format(self.ll_null, '.2f'))) if html: right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY | PValueStyle.HTML))) else: right_col.append(('p (LR):', fmt_p(lrtest_result.pvalue, PValueStyle.VALUE_ONLY))) return left_col, right_col def __repr__(self): if config.repr_is_summary: return self.summary() return super().__repr__() def _repr_html_(self): # Render header table left_col, right_col = self._header_table(html=True) out = ''.format(self.model_long_name) for left_cell, right_cell in itertools.zip_longest(left_col, right_col): out += ''.format( left_cell[0] if left_cell else '', left_cell[1] if left_cell else '', right_cell[0] if right_cell else '', right_cell[1] if right_cell else '' ) out += '
{} Results
{}{}{}{}
' # Render coefficients table out += ''.format('exp(β)' if self.exp else 'β', (1-config.alpha)*100) for term_name, term in self.terms.items(): if isinstance(term, SingleTerm): # Single term # Exponentiate beta if requested beta = term.beta if self.exp: beta = np.exp(beta) out += ''.format(term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML)) elif isinstance(term, CategoricalTerm): # Categorical term out += ''.format(term_name) # Render reference category if term.ref_category is not None: out += ''.format(term.ref_category) # Loop over terms for sub_term_name, sub_term in term.categories.items(): # Exponentiate beta if requested beta = sub_term.beta if self.exp: beta = np.exp(beta) out += ''.format(sub_term_name, beta.point, beta.ci_lower, beta.ci_upper, fmt_p(sub_term.pvalue, PValueStyle.TABULAR | PValueStyle.HTML)) else: raise Exception('Attempt to render unknown term type') out += '
{}({:g}% CI)p
{}{:.2f}({:.2f}{:.2f}){}
{}
{}Ref.
{}{:.2f}({:.2f}{:.2f}){}
' # TODO: Have a detailed view which shows SE, t/z, etc. if self.comments: out += '
    ' for comment in self.comments: out += '
  1. {}
  2. '.format(comment) out += '
' return out def summary(self): """ Return a stringified summary of the regression model :rtype: str """ # Render header table left_col, right_col = self._header_table(html=False) # Ensure equal sizes for SimpleTable if len(right_col) > len(left_col): left_col.extend([('', '')] * (len(right_col) - len(left_col))) elif len(left_col) > len(right_col): right_col.extend([('', '')] * (len(left_col) - len(right_col))) table1 = SimpleTable(np.concatenate([left_col, right_col], axis=1), title='{} Results'.format(self.model_long_name)) table1.insert_stubs(2, [' | '] * len(left_col)) # Get rid of last line (merge with next table) table1_lines = table1.as_text().splitlines(keepends=False) out = '\n'.join(table1_lines[:-1]) + '\n' # Render coefficients table table_data = [] for term_name, term in self.terms.items(): if isinstance(term, SingleTerm): # Single term # Exponentiate beta if requested beta = term.beta if self.exp: beta = np.exp(beta) # Add some extra padding table_data.append([term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(term.pvalue, PValueStyle.TABULAR)]) elif isinstance(term, CategoricalTerm): # Categorical term table_data.append([term_name + ' ', '', '', '', '', '']) # Render reference category if term.ref_category is not None: table_data.append(['{} '.format(term.ref_category), 'Ref.', '', '', '', '']) # Loop over terms for sub_term_name, sub_term in term.categories.items(): # Exponentiate beta if requested beta = sub_term.beta if self.exp: beta = np.exp(beta) table_data.append([sub_term_name + ' ', format(beta.point, '.2f'), '({:.2f}'.format(beta.ci_lower), '-', '{:.2f})'.format(beta.ci_upper), ' ' + fmt_p(sub_term.pvalue, PValueStyle.TABULAR)]) else: raise Exception('Attempt to render unknown term type') table2 = SimpleTable(data=table_data, headers=['', 'exp(β)' if self.exp else 'β', '', '\ue000', '', ' p']) # U+E000 is in Private Use Area, mark middle of CI column table2_text = table2.as_text().replace(' \ue000 ', '({:g}% CI)'.format((1-config.alpha)*100)) # Render heading in the right spot table2_lines = table2_text.splitlines(keepends=False) # Render divider line between 2 tables max_table_len = max(len(table1_lines[-1]), len(table2_lines[-1])) out += '=' * max_table_len + '\n' out += '\n'.join(table2_lines[1:]) if self.comments: out += '\n' for i, comment in enumerate(self.comments): out += '\n{}. {}'.format(i + 1, comment) return out def terms_flat(self): """ Iterate over each :class:`SingleTerm` in *self.terms*, recursively stepping through :class:`CategoricalTerm`\ s """ for t in self.terms.values(): if isinstance(t, CategoricalTerm): for t in t.categories.values(): yield t else: yield t # -------------------- # Post hoc tests, etc. def bayesfactor_beta_zero(self, term): """ Compute a Bayes factor testing the hypothesis that the given beta is 0 Uses the R *BFpack* library. Requires the regression to be from statsmodels. The term must be specified as the *raw name* from the statsmodels regression, available via :attr:`SingleTerm.raw_name`. :param term: Raw name of the term to be tested :type term: str :rtype: :class:`yli.bayes_factors.BayesFactor` """ # FIXME: Allow specifying our renamed terms # Get parameters required for AFBF raw_params = {} for t in self.terms_flat(): raw_params[t.raw_name.replace('[', '_').replace(']', '_')] = t.beta.point # Compute BF matrix bf01 = bayesfactor_afbf(pd.Series(raw_params), self.vcov, self.nobs, '{} = 0'.format(term.replace('[', '_').replace(']', '_'))) bf01 = BayesFactor(bf01.factor, '0', '{} = 0'.format(term), '1', '{} ≠ 0'.format(term)) if bf01.factor >= 1: return bf01 else: return bf01.invert() def deviance_chi2(self): """ Perform the deviance *χ*:sup:`2` test for goodness of fit :rtype: :class:`yli.sig_tests.ChiSquaredResult` """ deviance_model = 2 * (self.ll_saturated - self.ll_model) pvalue = 1 - stats.chi2.cdf(deviance_model, df=self.dof_resid) return ChiSquaredResult(deviance_model, int(self.dof_resid), pvalue) def ftest(self): """ Perform the *F* test that all slopes are 0 :rtype: :class:`yli.sig_tests.FTestResult` """ pvalue = 1 - stats.f(self.dof_model, self.dof_resid).cdf(self.f_statistic) return FTestResult(self.f_statistic, self.dof_model, self.dof_resid, pvalue) def lrtest_null(self): """ Compute the likelihood ratio test comparing the model to the null model :rtype: :class:`LikelihoodRatioTestResult` """ statistic = -2 * (self.ll_null - self.ll_model) pvalue = 1 - stats.chi2.cdf(statistic, self.dof_model) LikelihoodRatioTestResult return LikelihoodRatioTestResult(statistic, self.dof_model, pvalue) @property def pseudo_rsquared(self): """McFadden's pseudo *R*:sup:`2` statistic""" return 1 - self.ll_model/self.ll_null @property def pseudo_rsquared_adj(self): """McFadden's pseudo *R*:sup:`2` statistic, adjusted for number of parameters""" return 1 - (self.ll_model - self.dof_model) / self.ll_null def shap(self, **kwargs): """ Compute SHAP values for the model Uses the Python *shap* library. :param kwargs: Keyword arguments to pass to *shap.LinearExplainer* :rtype: :class:`yli.shap.ShapResult` **Reference:** Lundberg SM, Lee SI. A unified approach to interpreting model predictions. In: Guyon I, Von Luxburg U, Bengio S, et al., editors. *Advances in Neural Information Processing Systems*; 2017 Dec 4–9; Long Beach, CA. https://proceedings.neurips.cc/paper/2017/hash/8a20a8621978632d76c43dfd28b67767-Abstract.html """ import shap xdata = ShapResult._get_xdata(self) # Combine terms into single list params = [] for term in self.terms.values(): if isinstance(term, SingleTerm): params.append(term.beta.point) else: params.extend(s.beta.point for s in term.categories.values()) explainer = shap.LinearExplainer((np.array(params[1:]), params[0]), xdata, **kwargs) # FIXME: Assumes zeroth term is intercept shap_values = explainer.shap_values(xdata).astype('float') return ShapResult(weakref.ref(self), shap_values, list(xdata.columns)) class LikelihoodRatioTestResult(ChiSquaredResult): """ Result of a likelihood ratio test for regression See :meth:`RegressionModel.lrtest_null`. """ def __init__(self, statistic, dof, pvalue): super().__init__(statistic, dof, pvalue) def _repr_html_(self): return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION | PValueStyle.HTML)) def summary(self): """ Return a stringified summary of the likelihood ratio test :rtype: str """ return 'LR({}) = {:.2f}; p {}'.format(self.dof, self.statistic, fmt_p(self.pvalue, PValueStyle.RELATION)) class SingleTerm: """A term in a :class:`RegressionModel` which is a single term""" def __init__(self, raw_name, beta, pvalue): #: Raw name of the term (*str*) self.raw_name = raw_name #: :class:`yli.utils.Estimate` of the coefficient self.beta = beta #: *p* value for the coefficient (*float*) self.pvalue = pvalue class CategoricalTerm: """A group of terms in a :class:`RegressionModel` corresponding to a categorical variable""" def __init__(self, categories, ref_category): #: Terms for each of the categories, excluding the reference category (*dict* of :class:`SingleTerm`) self.categories = categories #: Name of the reference category (*str*) self.ref_category = ref_category def raw_terms_from_statsmodels_result(raw_result, *, wrapped=True): if wrapped: zipped_iter = zip(raw_result.model.exog_names, raw_result.params.values, raw_result.conf_int(config.alpha).values, raw_result.pvalues.values) else: zipped_iter = zip(raw_result.model.exog_names, raw_result.params, raw_result.conf_int(config.alpha), raw_result.pvalues) return { ('(Intercept)' if raw_name == 'Intercept' else raw_name): SingleTerm( raw_name=raw_name, beta=Estimate(raw_param, raw_ci[0], raw_ci[1]), pvalue=raw_p ) for raw_name, raw_param, raw_ci, raw_p in zipped_iter } # ------------------------ # Concrete implementations class Cox(RegressionModel): # TODO: Documentation @property def model_long_name(self): return 'Cox Regression' @property def model_short_name(self): return 'Cox' @classmethod def fit(cls, data_dep, data_ind, *, status=None): # Drop explicit intercept term if 'Intercept' in data_ind: del data_ind['Intercept'] result = cls() result.exp = True result.cov_type = 'nonrobust' result.nevents = status.sum() result.dof_model = len(data_ind.columns) # Perform regression raw_result = sm.PHReg(endog=data_dep, exog=data_ind, status=status, missing='raise').fit(disp=False) result.ll_model = raw_result.llf result.terms = raw_terms_from_statsmodels_result(raw_result, wrapped=False) result.vcov = raw_result.cov_params() return result class GLM(RegressionModel): # TODO: Documentation @property def model_long_name(self): return 'Generalised Linear Model' @property def model_short_name(self): return 'GLM' @classmethod def fit(cls, data_dep, data_ind, family, method='irls', maxiter=None, start_params=None): result = cls() result.exp = True result.cov_type = 'nonrobust' if family == 'binomial': sm_family = sm.families.Binomial() elif family == 'gamma': sm_family = sm.families.Gamma() elif family == 'gaussian': sm_family = sm.families.Gaussian() elif family == 'inverse_gaussian': sm_family = sm.families.InverseGaussian() elif family == 'negative_binomial': sm_family = sm.families.NegativeBinomial() elif family == 'poisson': sm_family = sm.families.Poisson() elif family == 'tweedie': sm_family = sm.families.Tweedie() else: raise ValueError('Unknown GLM family') # Perform regression raw_result = sm.GLM(endog=data_dep, exog=data_ind, family=sm_family, missing='raise').fit(disp=False, method=method, start_params=start_params) result.dof_model = raw_result.df_model result.dof_resid = raw_result.df_resid result.ll_model = raw_result.llf result.ll_null = raw_result.llnull result.terms = raw_terms_from_statsmodels_result(raw_result) result.vcov = raw_result.cov_params() return result class IntervalCensoredCox(RegressionModel): """ Interval-censored Cox regression Uses hpstat *intcox* command. **Example:** .. code-block:: df = pd.DataFrame(...) yli.regress(yli.IntervalCensoredCox, df, 'Left_Time + Right_Time', 'A + B + C + D + E + F + G + H') .. code-block:: text Interval-Censored Cox Regression Results =================================================================== Dep. Variable: Left_Time + Right_Time | No. Observations: 1124 Model: Interval-Censored Cox | No. Events: 133 Date: 2023-04-17 | Df. Model: 8 Time: 22:30:40 | Pseudo R²: 0.00 Std. Errors: OPG | LL-Model: -613.21 | LL-Null: -615.81 | p (LR): 0.82 =================================================================== exp(β) (95% CI) p ---------------------------------- A 0.83 (0.37 - 1.88) 0.66 B 1.08 (0.81 - 1.46) 0.60 C 0.49 (0.24 - 1.00) 0.052 D 0.79 (0.42 - 1.50) 0.48 E 0.87 (0.40 - 1.85) 0.71 F 0.64 (0.28 - 1.45) 0.29 G 1.07 (0.44 - 2.62) 0.88 H 1.23 (0.48 - 3.20) 0.67 ---------------------------------- The output summarises the result of the regression. **Reference:** Zeng D, Mao L, Lin DY. Maximum likelihood estimation for semiparametric transformation models with interval-censored data. *Biometrika*. 2016;103(2):253–71. `doi:10.1093/biomet/asw013 `_ """ @property def model_long_name(self): return 'Interval-Censored Cox Regression' @property def model_short_name(self): return 'Interval-Censored Cox' def __init__(self): super().__init__() # TODO: Documentation self.lambda_ = None @classmethod def fit(cls, data_dep, data_ind, *, reduced=False, maxiter=None, tolerance=None): if len(data_dep.columns) != 2: raise ValueError('IntervalCensoredCox requires left and right times') # Drop explicit intercept term if 'Intercept' in data_ind: del data_ind['Intercept'] result = cls() result.exp = True result.cov_type = 'OPG' result.nevents = np.isfinite(data_dep.iloc[:, 1]).sum() result.dof_model = len(data_ind.columns) # Prepare arguments intcox_args = [config.hpstat_path, 'intcox', '-', '--output', 'json'] if reduced: intcox_args.append('--reduced') if maxiter: intcox_args.append('--max-iterations') intcox_args.append(str(maxiter)) if tolerance: intcox_args.append('--param-tolerance') intcox_args.append(str(tolerance)) # Export data to CSV csv_buf = io.StringIO() data_dep.join(data_ind).to_csv(csv_buf, index=False) csv_str = csv_buf.getvalue() # Run intcens binary proc = subprocess.run(intcox_args, input=csv_str, stdout=subprocess.PIPE, stderr=None, encoding='utf-8', check=True) raw_result = json.loads(proc.stdout) from IPython.display import clear_output clear_output(wait=True) z_critical = -stats.norm.ppf(config.alpha / 2) result.terms = {raw_name: SingleTerm( raw_name=raw_name, beta=Estimate(raw_param, raw_param - z_critical * raw_se, raw_param + z_critical * raw_se), pvalue=stats.norm.cdf(-np.abs(raw_param) / raw_se) * 2 ) for raw_name, raw_param, raw_se in zip(data_ind.columns, raw_result['params'], raw_result['params_se'])} result.cumulative_hazard = pd.Series(data=raw_result['cumulative_hazard'], index=raw_result['cumulative_hazard_times']) result.ll_model = raw_result['ll_model'] result.ll_null = raw_result['ll_null'] return result def survival_function(self): # TODO: Documentation return np.exp(-self.cumulative_hazard) def plot_survival_function(self, ax=None): # TODO: Documentation import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots() sf = self.survival_function() # Draw straight lines # np.concatenate(...) to force starting drawing from time 0, survival 100% xpoints = np.concatenate([[0], sf.index]).repeat(2)[1:] ypoints = np.concatenate([[1], sf]).repeat(2)[:-1] ax.plot(xpoints, ypoints) ax.set_xlabel('Analysis time') ax.set_ylabel('Survival probability') ax.set_xlim(left=0) ax.set_ylim(0, 1) def plot_loglog_survival(self, ax=None): # TODO: Documentation import matplotlib.pyplot as plt if ax is None: fig, ax = plt.subplots() sf = self.survival_function() # Draw straight lines xpoints = np.log(sf.index).to_numpy().repeat(2)[1:] ypoints = (-np.log(-np.log(sf))).to_numpy().repeat(2)[:-1] ax.plot(xpoints, ypoints) ax.set_xlabel('ln(Analysis time)') ax.set_ylabel('−ln(−ln(Survival probability))') class Logit(RegressionModel): """ Logistic regression **Example:** .. code-block:: df = pd.DataFrame({ 'Unhealthy': [False, False, False, ...], 'Fibrinogen': [2.52, 2.46, 2.29, ...], 'GammaGlobulin': [38, 36, 36, ...] }) yli.regress(yli.Logit, df, 'Unhealthy', 'Fibrinogen + GammaGlobulin') .. code-block:: text Logistic Regression Results ====================================================== Dep. Variable: Unhealthy | No. Observations: 32 Model: Logit | Df. Model: 2 Date: 2022-10-18 | Df. Residuals: 29 Time: 19:00:34 | Pseudo R²: 0.26 Std. Errors: Non-Robust | LL-Model: -11.47 | LL-Null: -15.44 | p (LR): 0.02* ====================================================== exp(β) (95% CI) p ----------------------------------------------- (Intercept) 0.00 (0.00 - 0.24) 0.03* Fibrinogen 6.80 (1.01 - 45.79) 0.049* GammaGlobulin 1.17 (0.92 - 1.48) 0.19 ----------------------------------------------- The output summarises the results of the regression. Note that the parameter estimates are automatically exponentiated. For example, the adjusted odds ratio for unhealthiness per unit increase in fibrinogen is 6.80, with 95% confidence interval 1.01–45.79, and is significant with *p* value 0.049. """ @property def model_long_name(self): return 'Logistic Regression' @property def model_short_name(self): return 'Logit' @classmethod def fit(cls, data_dep, data_ind): result = cls() result.exp = True result.cov_type = 'nonrobust' # Perform regression raw_result = sm.Logit(endog=data_dep, exog=data_ind, missing='raise').fit(disp=False) result.dof_model = raw_result.df_model result.dof_resid = raw_result.df_resid result.ll_model = raw_result.llf result.ll_null = raw_result.llnull result.terms = raw_terms_from_statsmodels_result(raw_result) result.vcov = raw_result.cov_params() return result class OLS(RegressionModel): """ Ordinary least squares linear regression **Example:** .. code-block:: df = pd.DataFrame(...) yli.regress(yli.OLS, df, 'LNC', 'D + T1 + T2 + S + PR + NE + CT + BW + N + PT') .. code-block:: text Ordinary Least Squares Regression Results ======================================================= Dep. Variable: LNC | No. Observations: 32 Model: OLS | Df. Model: 10 Date: 2023-04-16 | Df. Residuals: 21 Time: 23:34:01 | R²: 0.86 Std. Errors: Non-Robust | F: 13.28 | p (F): <0.001* ======================================================= β (95% CI) p ---------------------------------------------- (Intercept) -10.63 (-22.51 - 1.24) 0.08 D 0.23 (0.05 - 0.41) 0.02* T1 0.01 (-0.04 - 0.05) 0.82 T2 0.01 (-0.00 - 0.02) 0.24 S 0.00 (0.00 - 0.00) <0.001* PR -0.11 (-0.28 - 0.07) 0.21 NE 0.26 (0.09 - 0.42) 0.004* CT 0.12 (-0.03 - 0.26) 0.12 BW 0.04 (-0.18 - 0.26) 0.73 N -0.01 (-0.03 - 0.00) 0.14 PT -0.22 (-0.49 - 0.05) 0.10 ---------------------------------------------- The output summarises the results of the regression. For example, the adjusted mean difference in "LNC" per unit increase in "D" is 0.23, with 95% confidence interval 0.05–0.41, and is significant with *p* value 0.02. """ @property def model_long_name(self): return 'Ordinary Least Squares Regression' @property def model_short_name(self): return 'OLS' @classmethod def fit(cls, data_dep, data_ind): result = cls() result.cov_type = 'nonrobust' # Perform regression raw_result = sm.OLS(endog=data_dep, exog=data_ind, missing='raise', hasconst=True).fit() result.dof_model = raw_result.df_model result.dof_resid = raw_result.df_resid result.rsquared = raw_result.rsquared result.ll_model = raw_result.llf result.f_statistic = raw_result.fvalue result.terms = raw_terms_from_statsmodels_result(raw_result) result.vcov = raw_result.cov_params() return result class OrdinalLogit(RegressionModel): """ Ordinal logistic (or probit) regression The implementation calls statsmodels' native *OrderedModel*, but substitutes an alternative parameterisation used by R and Stata. In the native statsmodels implementation, the first cutoff parameter is the true cutoff, but further cutoff parameter are log differences between consecutive cutoffs. In this parameterisation, cutoff terms are represented directly in the model. **Example:** .. code-block:: df = pd.DataFrame(...) yli.regress(yli.OrdinalLogit, df, 'apply', 'pared + public + gpa', exp=False) .. code-block:: text Ordinal Logistic Regression Results ========================================================== Dep. Variable: apply | No. Observations: 400 Model: Ordinal Logit | Df. Model: 5 Date: 2022-12-02 | Df. Residuals: 395 Time: 21:30:38 | Pseudo R²: 0.03 Std. Errors: Non-Robust | LL-Model: -358.51 | LL-Null: -370.60 | p (LR): <0.001* ============================================================ β (95% CI) p ------------------------------------------------------------ pared 1.05 (0.53 - 1.57) <0.001* public -0.06 (-0.64 - 0.53) 0.84 gpa 0.62 (0.10 - 1.13) 0.02* (Cutoffs) unlikely/somewhat likely 2.20 (0.68 - 3.73) 0.005* somewhat likely/very likely 4.30 (2.72 - 5.88) <0.001* ------------------------------------------------------------ The output summarises the result of the regression. The parameters shown under "(Cutoffs)" are the cutoff values in the latent variable parameterisation of ordinal regression. Note that because `exp=False` was passed, the parameter estimates are not automatically exponentiated. """ @property def model_long_name(self): return 'Ordinal Logistic Regression' @property def model_short_name(self): return 'Ordinal Logit' @classmethod def fit(cls, data_dep, data_ind): result = cls() result.cov_type = 'nonrobust' # Drop explicit intercept term if 'Intercept' in data_ind: del data_ind['Intercept'] # Perform regression raw_result = _SMOrdinalLogit(endog=data_dep, exog=data_ind, missing='raise').fit(disp=False) result.dof_model = raw_result.df_model result.dof_resid = raw_result.df_resid result.ll_model = raw_result.llf result.ll_null = raw_result.llnull raw_terms = raw_terms_from_statsmodels_result(raw_result) result.terms = {} # Group ordinal regression cutoffs for raw_name, raw_term in raw_terms.items(): if '/' in raw_name: if '(Cutoffs)' not in result.terms: result.terms['(Cutoffs)'] = CategoricalTerm({}, None) result.terms['(Cutoffs)'].categories[raw_name] = raw_term else: result.terms[raw_name] = raw_term return result def brant(self): """ Perform the Brant test for the parallel regression assumption in ordinal regression :rtype: :class:`BrantResult` **Example:** .. code-block:: df = pd.DataFrame(...) model = yli.regress(yli.OrdinalLogit, df, 'apply', 'pared + public + gpa', exp=False) model.brant() .. code-block:: text χ² df p Omnibus 4.34 3 0.23 pared 0.13 1 0.72 public 3.44 1 0.06 gpa 0.18 1 0.67 The output shows the result of the Brant test. For example, for the omnibus test of the parallel regression assumption across all independent variables, the *χ*:sup:`2` statistic is 4.34, the *χ*:sup:`2` distribution has 3 degrees of freedom, and the test is not significant, with *p* value 0.23. **Reference:** Brant R. Assessing proportionality in the proportional odds model for ordinal logistic regression. *Biometrics*. 1990;46(4):1171–8. `doi:10.2307/2532457 `_ """ df = self.df() if df is None: raise Exception('Referenced DataFrame has been dropped') dep = self.dep # Precompute design matrices _, dmatrices, dep_categories = df_to_dmatrices(df, dep, self.formula, 'omit') s_dep = dmatrices[0][dmatrices[0].columns[0]] # df[dep] as series - must get this from dmatrices to account for as_numeric dmatrix_right = dmatrices[1] dmatrix_right.reset_index(drop=True, inplace=True) # Otherwise this confuses matrix multiplication # Fit individual logistic regressions logit_models = [] for upper_limit in sorted(s_dep.unique())[:-1]: dep_dichotomous = (s_dep <= upper_limit).astype(int).reset_index(drop=True) logit_result = sm.Logit(dep_dichotomous, dmatrix_right).fit(disp=False) if not logit_result.mle_retvals['converged']: raise Exception('Maximum likelihood estimation failed to converge for {} <= {}. Check raw_result.mle_retvals.'.format(dep, upper_limit)) if pd.isna(logit_result.bse).any(): raise Exception('Regression returned NaN standard errors for {} <= {}.'.format(dep, upper_limit)) logit_models.append(logit_result) logit_betas = np.array([model._results.params for model in logit_models]).T logit_pihat = np.array([expit(-model.fittedvalues) for model in logit_models]).T # Predicted probabilities # vcov is the variance-covariance matrix of all individually fitted betas across all terms # | model 1 | model 2 | model 3 | ... # | term 1 | term 2 | term 1 | term 2 | term 1 | term 2 | ... # model 1 | term 1 | # | term 2 | # model 2 | term 1 | # | term 2 | # ... n_terms = len(dmatrix_right.columns) - 1 # number of beta terms (excluding intercept) n_betas = len(logit_models) * n_terms vcov = np.zeros((n_betas, n_betas)) # Populate the variance-covariance matrix for comparisons between individually fitted models for j in range(0, len(logit_models) - 1): for l in range(j + 1, len(logit_models)): Wjj = np.diag(logit_pihat[:,j] - logit_pihat[:,j] * logit_pihat[:,j]) Wjl = np.diag(logit_pihat[:,l] - logit_pihat[:,j] * logit_pihat[:,l]) Wll = np.diag(logit_pihat[:,l] - logit_pihat[:,l] * logit_pihat[:,l]) matrix_result = np.linalg.inv(dmatrix_right.T @ Wjj @ dmatrix_right) @ dmatrix_right.T @ Wjl @ dmatrix_right @ np.linalg.inv(dmatrix_right.T @ Wll @ dmatrix_right) j_vs_l_vcov = np.asarray(matrix_result)[1:,1:] # Asymptotic covariance for j,l vcov[j*n_terms:(j+1)*n_terms, l*n_terms:(l+1)*n_terms] = j_vs_l_vcov vcov[l*n_terms:(l+1)*n_terms, j*n_terms:(j+1)*n_terms] = j_vs_l_vcov # Populate the variance-covariance matrix within each individually fitted model for i in range(len(logit_models)): vcov[i*n_terms:(i+1)*n_terms, i*n_terms:(i+1)*n_terms] = logit_models[i]._results.cov_params()[1:,1:] # ------------------ # Perform Wald tests beta_names = ['{}_{}'.format(raw_name, i) for i in range(len(logit_models)) for raw_name in dmatrix_right.columns[1:]] wald_results = {} # Omnibus test constraints = [' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models))) for raw_name in dmatrix_right.columns[1:]] constraint = ', '.join(constraints) dof = (len(logit_models) - 1) * (len(dmatrix_right.columns) - 1) # df = (number of levels minus 2) * (number of terms excluding intercept) wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, dof) wald_results['Omnibus'] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue) # Individual terms for raw_name in dmatrix_right.columns[1:]: constraint = ' = '.join('{}_{}'.format(raw_name, i) for i in range(len(logit_models))) dof = len(logit_models) - 1 # df = (number of levels minus 2) wald_result = _wald_test(beta_names, logit_betas[1:].ravel('F'), constraint, vcov, dof) wald_results[raw_name] = ChiSquaredResult(wald_result.statistic, wald_result.df_denom, wald_result.pvalue) return BrantResult(wald_results) class _SMOrdinalLogit(statsmodels.miscmodels.ordinal_model.OrderedModel): def __init__(self, endog, exog, **kwargs): if 'distr' not in kwargs: kwargs['distr'] = 'logit' super().__init__(endog, exog, **kwargs) def transform_threshold_params(self, params): th_params = params[-(self.k_levels - 1):] thresh = np.concatenate(([-np.inf], th_params, [np.inf])) return thresh def transform_reverse_threshold_params(self, params): return params[:-1] class PenalisedLogit(RegressionModel): """ Firth penalised logistic regression Uses the R *logistf* library. **Example:** .. code-block:: df = pd.DataFrame({ 'Pred': [1] * 20 + [0] * 220, 'Outcome': [1] * 40 + [0] * 200 }) yli.regress(yli.PenalisedLogit, df, 'Outcome', 'Pred', exp=False) .. code-block:: text Penalised Logistic Regression Results ============================================================ Dep. Variable: Outcome | No. Observations: 240 Model: Penalised Logit | Df. Model: 1 Date: 2022-10-19 | Pseudo R²: 0.37 Time: 07:50:40 | LL-Model: -66.43 Std. Errors: Non-Robust | LL-Null: -105.91 | p (LR): <0.001* ============================================================ β (95% CI) p --------------------------------------------- (Intercept) -2.28 (-2.77 - -1.85) <0.001* Pred 5.99 (3.95 - 10.85) <0.001* --------------------------------------------- The output summarises the result of the regression. The summary table shows that a penalised method was applied. Note that because `exp=False` was passed, the parameter estimates are not automatically exponentiated. """ @property def model_long_name(self): return 'Penalised Logistic Regression' @property def model_short_name(self): return 'Penalised Logit' @classmethod def fit(cls, data_dep, data_ind): result = cls() result.cov_type = 'nonrobust' import rpy2.robjects as ro import rpy2.robjects.packages import rpy2.robjects.pandas2ri df = data_dep.join(data_ind) # Drop explicit intercept term if 'Intercept' in df: del df['Intercept'] # Import logistf ro.packages.importr('logistf') with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter): with ro.local_context() as lc: # Convert DataFrame to R lc['df'] = df # Transfer other parameters to R lc['formula_'] = df.columns[0] + ' ~ ' + ' + '.join(df.columns[1:]) lc['alpha'] = config.alpha # Fit the model model = ro.r('logistf(formula_, data=df, alpha=alpha)') result.dof_model = model['df'][0] result.ll_model = model['loglik'][0] result.ll_null = model['loglik'][1] result.terms = {t: SingleTerm(t, Estimate(b, ci0, ci1), p) for t, b, ci0, ci1, p in zip(model['terms'], model['coefficients'], model['ci.lower'], model['ci.upper'], model['prob'])} return result class Poisson(RegressionModel): """ Poisson regression **Example:** .. code-block:: df = pd.DataFrame(...) yli.regress(yli.Poisson, df, 'num_awards', 'C(prog) + math') .. code-block:: text Poisson Regression Results ======================================================= Dep. Variable: num_awards | No. Observations: 200 Model: Poisson | Df. Model: 3 Date: 2023-04-22 | Df. Residuals: 196 Time: 16:58:21 | Pseudo R²: 0.21 Std. Errors: Non-Robust | LL-Model: -182.75 | LL-Null: -231.86 | p (LR): <0.001* ======================================================= exp(β) (95% CI) p -------------------------------------------- (Intercept) 0.01 (0.00 - 0.02) <0.001* prog 1 Ref. 2 2.96 (1.46 - 5.97) 0.002* 3 1.45 (0.61 - 3.44) 0.40 math 1.07 (1.05 - 1.10) <0.001* -------------------------------------------- The output summarises the results of the regression. For example, the adjusted incidence rate ratio in "num_awards" per unit increase in "math" is 1.07, with 95% confidence interval 1.05–1.10, and is significant with *p* value < 0.001. """ @property def model_long_name(self): return 'Poisson Regression' @property def model_short_name(self): return 'Poisson' @classmethod def fit(cls, data_dep, data_ind, exposure=None, method='newton', maxiter=None, start_params=None): result = cls() result.exp = True result.cov_type = 'nonrobust' # Perform regression raw_result = sm.Poisson(endog=data_dep, exog=data_ind, exposure=exposure, missing='raise').fit(disp=False, method=method, start_params=start_params) result.dof_model = raw_result.df_model result.dof_resid = raw_result.df_resid result.ll_model = raw_result.llf result.ll_null = raw_result.llnull # Compute saturated log-likelihood result.ll_saturated = float(sm.families.Poisson().loglike(data_dep + 1e-10, data_dep + 1e-10)) result.terms = raw_terms_from_statsmodels_result(raw_result) result.vcov = raw_result.cov_params() return result class TimeVaryingCox(RegressionModel): # TODO: Documentation # Requires df to be in lifelines long format @property def model_long_name(self): return 'Time-Varying Cox Regression' @property def model_short_name(self): return 'Time-Varying Cox' @classmethod def fit(cls, data_dep, data_ind, exposure=None, method='newton', maxiter=None, start_params=None): result = cls() result.exp = True result.cov_type = 'nonrobust' import lifelines # Perform regression ctv = lifelines.CoxTimeVaryingFitter(penalizer=0) ctv.fit(data_dep.join(data_ind), id_col='index', event_col=data_dep.columns[0], start_col='start', stop_col='stop') result.nobs = ctv._n_unique result.nevents = ctv.event_observed.sum() result.dof_model = len(ctv.params_) result.ll_model = ctv.log_likelihood_ result.ll_null = ctv._log_likelihood_null result.terms = { raw_name: SingleTerm( raw_name=raw_name, beta=Estimate(raw_param, raw_ci[0], raw_ci[1]), pvalue=raw_p ) for raw_name, raw_param, raw_ci, raw_p in zip(ctv.params_.index, ctv.params_, ctv.confidence_intervals_.itertuples(index=False), ctv._compute_p_values()) } result.vcov = ctv.variance_matrix_ return result # ------------------ # Brant test helpers class _Dummy: pass def _wald_test(param_names, params, formula, vcov, df): # Hack! Piggyback off statsmodels to compute a Wald test lmr = statsmodels.base.model.LikelihoodModelResults(model=None, params=None) lmr.model = _Dummy() lmr.model.data = _Dummy() lmr.model.data.cov_names = param_names lmr.params = params lmr.df_resid = df return lmr.wald_test(formula, cov_p=vcov, use_f=False, scalar=True) class BrantResult: """ Result of a Brant test for ordinal regression See :meth:`OrdinalLogit.brant`. """ def __init__(self, tests): #: Results for Brant test on each coefficient (*Dict[str, ChiSquaredResult]*) self.tests = tests def __repr__(self): if config.repr_is_summary: return self.summary() return super().__repr__() def _repr_html_(self): out = '' for raw_name, test in self.tests.items(): out += ''.format(raw_name, test.statistic, test.dof, fmt_p(test.pvalue, PValueStyle.TABULAR | PValueStyle.HTML)) out += '
Brant Test Results
χ2dfp
{}{:.2f}{:.0f}{}
' return out def summary(self): """ Return a stringified summary of the *χ*:sup:`2` test :rtype: str """ table = pd.DataFrame([ ['{:.2f}'.format(test.statistic), '{:.0f}'.format(test.dof), fmt_p(test.pvalue, PValueStyle.TABULAR)] for test in self.tests.values() ], index=self.tests.keys(), columns=['χ² ', 'df', 'p ']) return str(table)