From dbfcec56c326a6b32fbc2ec61f404abdfc97a891 Mon Sep 17 00:00:00 2001 From: RunasSudo Date: Sun, 5 Mar 2023 02:11:12 +1100 Subject: [PATCH] Implement yli.cox_interval_censored --- yli/__init__.py | 2 +- yli/regress.py | 113 ++++++++++++++++++++++++------------------------ yli/survival.py | 95 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 152 insertions(+), 58 deletions(-) diff --git a/yli/__init__.py b/yli/__init__.py index f738b32..c93f752 100644 --- a/yli/__init__.py +++ b/yli/__init__.py @@ -22,7 +22,7 @@ from .graphs import init_fonts from .io import pickle_read_compressed, pickle_read_encrypted, pickle_write_compressed, pickle_write_encrypted from .regress import OrdinalLogit, PenalisedLogit, logit_then_regress, regress, vif from .sig_tests import anova_oneway, auto_univariable, chi2, mannwhitney, pearsonr, spearman, ttest_ind -from .survival import kaplanmeier, logrank, turnbull +from .survival import cox_interval_censored, kaplanmeier, logrank, turnbull from .utils import as_ordinal def reload_me(): diff --git a/yli/regress.py b/yli/regress.py index 89d2100..4631ea5 100644 --- a/yli/regress.py +++ b/yli/regress.py @@ -19,7 +19,7 @@ import pandas as pd import patsy from scipy import stats from scipy.special import expit -import statsmodels, statsmodels.miscmodels.ordinal_model +import statsmodels, statsmodels.base.model, statsmodels.miscmodels.ordinal_model import statsmodels.api as sm from statsmodels.iolib.table import SimpleTable from statsmodels.stats.outliers_influence import variance_inflation_factor @@ -513,7 +513,8 @@ class RegressionResult: right_col.append(('No. Observations:', format(self.nobs, '.0f'))) if self.nevents: right_col.append(('No. Events:', format(self.nevents, '.0f'))) - right_col.append(('Df. Model:', format(self.dof_model, '.0f'))) + if self.dof_model: + right_col.append(('Df. Model:', format(self.dof_model, '.0f'))) if self.dof_resid: right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f'))) if self.rsquared: @@ -943,7 +944,7 @@ def regress( if model_class is sm.OLS: method_name = 'Least Squares' elif model_class is sm.PHReg: - method_name = 'MPLE' + method_name = 'MLE' else: # Parse raw regression results to get fit method # Avoid this in general as it can be expensive to summarise all the post hoc tests, etc. @@ -1013,6 +1014,59 @@ def logit_then_regress(model_class, df, dep, formula, *, nan_policy='warn', **kw # Perform desired regression return regress(model_class, df, dep, formula, start_params=logit_params, **kwargs) +class _Dummy: pass + +def _wald_test(param_names, params, formula, vcov, df): + # Hack! Piggyback off statsmodels to compute a Wald test + + lmr = statsmodels.base.model.LikelihoodModelResults(model=None, params=None) + lmr.model = _Dummy() + lmr.model.data = _Dummy() + lmr.model.data.cov_names = param_names + lmr.params = params + lmr.df_resid = df + + return lmr.wald_test(formula, cov_p=vcov, use_f=False, scalar=True) + +class BrantResult: + """ + Result of a Brant test for ordinal regression + + See :meth:`RegressionResult.brant`. + """ + + def __init__(self, tests): + #: Results for Brant test on each coefficient (*Dict[str, ChiSquaredResult]*) + self.tests = tests + + def __repr__(self): + if config.repr_is_summary: + return self.summary() + return super().__repr__() + + def _repr_html_(self): + out = '' + + for raw_name, test in self.tests.items(): + out += ''.format(raw_name, test.statistic, test.dof, fmt_p(test.pvalue, PValueStyle.TABULAR | PValueStyle.HTML)) + + out += '
Brant Test Results
χ2dfp
{}{:.2f}{:.0f}{}
' + return out + + def summary(self): + """ + Return a stringified summary of the *χ*:sup:`2` test + + :rtype: str + """ + + table = pd.DataFrame([ + ['{:.2f}'.format(test.statistic), '{:.0f}'.format(test.dof), fmt_p(test.pvalue, PValueStyle.TABULAR)] + for test in self.tests.values() + ], index=self.tests.keys(), columns=['χ² ', 'df', 'p ']) + + return str(table) + # ----------------------------- # Penalised logistic regression @@ -1155,56 +1209,3 @@ class OrdinalLogit(statsmodels.miscmodels.ordinal_model.OrderedModel): def transform_reverse_threshold_params(self, params): return params[:-1] - -class _Dummy: pass - -def _wald_test(param_names, params, formula, vcov, df): - # Hack! Piggyback off statsmodels to compute a Wald test - - lmr = statsmodels.base.model.LikelihoodModelResults(model=None, params=None) - lmr.model = _Dummy() - lmr.model.data = _Dummy() - lmr.model.data.cov_names = param_names - lmr.params = params - lmr.df_resid = df - - return lmr.wald_test(formula, cov_p=vcov, use_f=False, scalar=True) - -class BrantResult: - """ - Result of a Brant test for ordinal regression - - See :meth:`RegressionResult.brant`. - """ - - def __init__(self, tests): - #: Results for Brant test on each coefficient (*Dict[str, ChiSquaredResult]*) - self.tests = tests - - def __repr__(self): - if config.repr_is_summary: - return self.summary() - return super().__repr__() - - def _repr_html_(self): - out = '' - - for raw_name, test in self.tests.items(): - out += ''.format(raw_name, test.statistic, test.dof, fmt_p(test.pvalue, PValueStyle.TABULAR | PValueStyle.HTML)) - - out += '
Brant Test Results
χ2dfp
{}{:.2f}{:.0f}{}
' - return out - - def summary(self): - """ - Return a stringified summary of the *χ*:sup:`2` test - - :rtype: str - """ - - table = pd.DataFrame([ - ['{:.2f}'.format(test.statistic), '{:.0f}'.format(test.dof), fmt_p(test.pvalue, PValueStyle.TABULAR)] - for test in self.tests.values() - ], index=self.tests.keys(), columns=['χ² ', 'df', 'p ']) - - return str(table) diff --git a/yli/survival.py b/yli/survival.py index 1d48acb..539f741 100644 --- a/yli/survival.py +++ b/yli/survival.py @@ -14,12 +14,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +import numpy as np from scipy import stats import statsmodels.api as sm from .config import config from .sig_tests import ChiSquaredResult -from .utils import check_nan +from .regress import RegressionResult, SingleTerm +from .utils import Estimate, check_nan, cols_for_formula, convert_pandas_nullable + +from datetime import datetime +import weakref def kaplanmeier(df, time, status, by=None, *, ci=True, transform_x=None, transform_y=None, nan_policy='warn'): """ @@ -271,3 +276,91 @@ def logrank(df, time, status, by, nan_policy='warn'): statistic, pvalue = sm.duration.survdiff(df[time], df[status], df[by]) return ChiSquaredResult(statistic=statistic, dof=1, pvalue=pvalue) + +# -------------------------------- +# Interval-censored Cox regression + +def cox_interval_censored( + df, time_left, time_right, formula, *, + bootstrap_samples=100, + nan_policy='warn', + bool_baselevels=False, exp=True, +): + # TODO: Documentation + + df_ref = weakref.ref(df) + + # Check for/clean NaNs in input columns + columns = [time_left, time_right] + cols_for_formula(formula, df) + + df = df[columns] + df = check_nan(df, nan_policy) + + # FIXME: Ensure numeric type for dependent variable + #df[dep], dep_categories = as_numeric(df[dep]) + if df[time_left].dtype != 'float64' or df[time_right].dtype != 'float64': + raise NotImplementedError('Time dtypes must be float64') + + # Convert pandas nullable types for independent variables + df = convert_pandas_nullable(df) + + # --------- + # Fit model + + # lifelines.CoxPHFitter doesn't do confidence intervals so we use R + + import rpy2.robjects as ro + import rpy2.robjects.packages + import rpy2.robjects.pandas2ri + + # Convert bool to int otherwise rpy2 chokes + df = df.replace({False: 0, True: 1}) + + # Import icenReg + ro.packages.importr('icenReg') + + with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter): + with ro.local_context() as lc: + # Convert DataFrame to R + lc['df'] = df + + # Transfer other parameters to R + lc['formula_'] = 'Surv({}, {}, type="interval2") ~ {}'.format(time_left, time_right, formula) + lc['bootstrap_samples'] = bootstrap_samples + + # FIXME: Seed bootstrap RNG? + + # Fit the model + ro.r('model <- ic_sp(as.formula(formula_), data=df, bs_samples=bootstrap_samples)') + + model = ro.r('model') + # Hard to access attributes through rpy2 + term_parameters = ro.r('model$coef') + term_names = ro.r('names(model$coef)') + term_cis = ro.r('confint(model)') + cov_matrix = ro.r('model$var') + llf = ro.r('model$llk')[0] + + # TODO: Handle categorical terms? + terms = {} + for i in range(len(term_parameters)): + # These values not directly exposed so we must calculate them + se = np.sqrt(cov_matrix[i, i]) + pvalue = 2 * stats.norm(loc=0, scale=se).cdf(-np.abs(term_parameters[i])) + + term = SingleTerm(term_names[i], Estimate(term_parameters[i], term_cis[i][0], term_cis[i][1]), pvalue) + terms[term_names[i]] = term + + result = RegressionResult( + None, df_ref, '({}, {}]'.format(time_left, time_right), formula, nan_policy, None, None, + model, + 'Interval-Censored Cox Regression', 'CoxIC', 'MLE', + len(df), None, None, datetime.now(), 'Bootstrap', + terms, + llf, None, + None, None, None, + [], + exp + ) + + return result