Implement yli.cox_interval_censored

This commit is contained in:
RunasSudo 2023-03-05 02:11:12 +11:00
parent e12dd65fdc
commit dbfcec56c3
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
3 changed files with 152 additions and 58 deletions

View File

@ -22,7 +22,7 @@ from .graphs import init_fonts
from .io import pickle_read_compressed, pickle_read_encrypted, pickle_write_compressed, pickle_write_encrypted
from .regress import OrdinalLogit, PenalisedLogit, logit_then_regress, regress, vif
from .sig_tests import anova_oneway, auto_univariable, chi2, mannwhitney, pearsonr, spearman, ttest_ind
from .survival import kaplanmeier, logrank, turnbull
from .survival import cox_interval_censored, kaplanmeier, logrank, turnbull
from .utils import as_ordinal
def reload_me():

View File

@ -19,7 +19,7 @@ import pandas as pd
import patsy
from scipy import stats
from scipy.special import expit
import statsmodels, statsmodels.miscmodels.ordinal_model
import statsmodels, statsmodels.base.model, statsmodels.miscmodels.ordinal_model
import statsmodels.api as sm
from statsmodels.iolib.table import SimpleTable
from statsmodels.stats.outliers_influence import variance_inflation_factor
@ -513,7 +513,8 @@ class RegressionResult:
right_col.append(('No. Observations:', format(self.nobs, '.0f')))
if self.nevents:
right_col.append(('No. Events:', format(self.nevents, '.0f')))
right_col.append(('Df. Model:', format(self.dof_model, '.0f')))
if self.dof_model:
right_col.append(('Df. Model:', format(self.dof_model, '.0f')))
if self.dof_resid:
right_col.append(('Df. Residuals:', format(self.dof_resid, '.0f')))
if self.rsquared:
@ -943,7 +944,7 @@ def regress(
if model_class is sm.OLS:
method_name = 'Least Squares'
elif model_class is sm.PHReg:
method_name = 'MPLE'
method_name = 'MLE'
else:
# Parse raw regression results to get fit method
# Avoid this in general as it can be expensive to summarise all the post hoc tests, etc.
@ -1013,6 +1014,59 @@ def logit_then_regress(model_class, df, dep, formula, *, nan_policy='warn', **kw
# Perform desired regression
return regress(model_class, df, dep, formula, start_params=logit_params, **kwargs)
class _Dummy: pass
def _wald_test(param_names, params, formula, vcov, df):
# Hack! Piggyback off statsmodels to compute a Wald test
lmr = statsmodels.base.model.LikelihoodModelResults(model=None, params=None)
lmr.model = _Dummy()
lmr.model.data = _Dummy()
lmr.model.data.cov_names = param_names
lmr.params = params
lmr.df_resid = df
return lmr.wald_test(formula, cov_p=vcov, use_f=False, scalar=True)
class BrantResult:
"""
Result of a Brant test for ordinal regression
See :meth:`RegressionResult.brant`.
"""
def __init__(self, tests):
#: Results for Brant test on each coefficient (*Dict[str, ChiSquaredResult]*)
self.tests = tests
def __repr__(self):
if config.repr_is_summary:
return self.summary()
return super().__repr__()
def _repr_html_(self):
out = '<table><caption>Brant Test Results</caption><thead><tr><th></th><th style="text-align:center"><i>χ</i><sup>2</sup></th><th style="text-align:center">df</th><th style="text-align:center"><i>p</i></th></thead><tbody>'
for raw_name, test in self.tests.items():
out += '<tr><th>{}</th><td>{:.2f}</td><td>{:.0f}</td><td style="text-align:left">{}</td></tr>'.format(raw_name, test.statistic, test.dof, fmt_p(test.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
out += '</tbody></table>'
return out
def summary(self):
"""
Return a stringified summary of the *χ*:sup:`2` test
:rtype: str
"""
table = pd.DataFrame([
['{:.2f}'.format(test.statistic), '{:.0f}'.format(test.dof), fmt_p(test.pvalue, PValueStyle.TABULAR)]
for test in self.tests.values()
], index=self.tests.keys(), columns=['χ² ', 'df', 'p '])
return str(table)
# -----------------------------
# Penalised logistic regression
@ -1155,56 +1209,3 @@ class OrdinalLogit(statsmodels.miscmodels.ordinal_model.OrderedModel):
def transform_reverse_threshold_params(self, params):
return params[:-1]
class _Dummy: pass
def _wald_test(param_names, params, formula, vcov, df):
# Hack! Piggyback off statsmodels to compute a Wald test
lmr = statsmodels.base.model.LikelihoodModelResults(model=None, params=None)
lmr.model = _Dummy()
lmr.model.data = _Dummy()
lmr.model.data.cov_names = param_names
lmr.params = params
lmr.df_resid = df
return lmr.wald_test(formula, cov_p=vcov, use_f=False, scalar=True)
class BrantResult:
"""
Result of a Brant test for ordinal regression
See :meth:`RegressionResult.brant`.
"""
def __init__(self, tests):
#: Results for Brant test on each coefficient (*Dict[str, ChiSquaredResult]*)
self.tests = tests
def __repr__(self):
if config.repr_is_summary:
return self.summary()
return super().__repr__()
def _repr_html_(self):
out = '<table><caption>Brant Test Results</caption><thead><tr><th></th><th style="text-align:center"><i>χ</i><sup>2</sup></th><th style="text-align:center">df</th><th style="text-align:center"><i>p</i></th></thead><tbody>'
for raw_name, test in self.tests.items():
out += '<tr><th>{}</th><td>{:.2f}</td><td>{:.0f}</td><td style="text-align:left">{}</td></tr>'.format(raw_name, test.statistic, test.dof, fmt_p(test.pvalue, PValueStyle.TABULAR | PValueStyle.HTML))
out += '</tbody></table>'
return out
def summary(self):
"""
Return a stringified summary of the *χ*:sup:`2` test
:rtype: str
"""
table = pd.DataFrame([
['{:.2f}'.format(test.statistic), '{:.0f}'.format(test.dof), fmt_p(test.pvalue, PValueStyle.TABULAR)]
for test in self.tests.values()
], index=self.tests.keys(), columns=['χ² ', 'df', 'p '])
return str(table)

View File

@ -14,12 +14,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import numpy as np
from scipy import stats
import statsmodels.api as sm
from .config import config
from .sig_tests import ChiSquaredResult
from .utils import check_nan
from .regress import RegressionResult, SingleTerm
from .utils import Estimate, check_nan, cols_for_formula, convert_pandas_nullable
from datetime import datetime
import weakref
def kaplanmeier(df, time, status, by=None, *, ci=True, transform_x=None, transform_y=None, nan_policy='warn'):
"""
@ -271,3 +276,91 @@ def logrank(df, time, status, by, nan_policy='warn'):
statistic, pvalue = sm.duration.survdiff(df[time], df[status], df[by])
return ChiSquaredResult(statistic=statistic, dof=1, pvalue=pvalue)
# --------------------------------
# Interval-censored Cox regression
def cox_interval_censored(
df, time_left, time_right, formula, *,
bootstrap_samples=100,
nan_policy='warn',
bool_baselevels=False, exp=True,
):
# TODO: Documentation
df_ref = weakref.ref(df)
# Check for/clean NaNs in input columns
columns = [time_left, time_right] + cols_for_formula(formula, df)
df = df[columns]
df = check_nan(df, nan_policy)
# FIXME: Ensure numeric type for dependent variable
#df[dep], dep_categories = as_numeric(df[dep])
if df[time_left].dtype != 'float64' or df[time_right].dtype != 'float64':
raise NotImplementedError('Time dtypes must be float64')
# Convert pandas nullable types for independent variables
df = convert_pandas_nullable(df)
# ---------
# Fit model
# lifelines.CoxPHFitter doesn't do confidence intervals so we use R
import rpy2.robjects as ro
import rpy2.robjects.packages
import rpy2.robjects.pandas2ri
# Convert bool to int otherwise rpy2 chokes
df = df.replace({False: 0, True: 1})
# Import icenReg
ro.packages.importr('icenReg')
with ro.conversion.localconverter(ro.default_converter + ro.pandas2ri.converter):
with ro.local_context() as lc:
# Convert DataFrame to R
lc['df'] = df
# Transfer other parameters to R
lc['formula_'] = 'Surv({}, {}, type="interval2") ~ {}'.format(time_left, time_right, formula)
lc['bootstrap_samples'] = bootstrap_samples
# FIXME: Seed bootstrap RNG?
# Fit the model
ro.r('model <- ic_sp(as.formula(formula_), data=df, bs_samples=bootstrap_samples)')
model = ro.r('model')
# Hard to access attributes through rpy2
term_parameters = ro.r('model$coef')
term_names = ro.r('names(model$coef)')
term_cis = ro.r('confint(model)')
cov_matrix = ro.r('model$var')
llf = ro.r('model$llk')[0]
# TODO: Handle categorical terms?
terms = {}
for i in range(len(term_parameters)):
# These values not directly exposed so we must calculate them
se = np.sqrt(cov_matrix[i, i])
pvalue = 2 * stats.norm(loc=0, scale=se).cdf(-np.abs(term_parameters[i]))
term = SingleTerm(term_names[i], Estimate(term_parameters[i], term_cis[i][0], term_cis[i][1]), pvalue)
terms[term_names[i]] = term
result = RegressionResult(
None, df_ref, '({}, {}]'.format(time_left, time_right), formula, nan_policy, None, None,
model,
'Interval-Censored Cox Regression', 'CoxIC', 'MLE',
len(df), None, None, datetime.now(), 'Bootstrap',
terms,
llf, None,
None, None, None,
[],
exp
)
return result