Implement yli.Cox

This commit is contained in:
RunasSudo 2023-07-16 16:22:34 +10:00
parent d17412ca07
commit fd7384f810
Signed by: RunasSudo
GPG Key ID: 7234E476BF21C61A
2 changed files with 65 additions and 12 deletions

View File

@ -20,7 +20,7 @@ from .descriptives import auto_correlations, auto_descriptives
from .distributions import beta_oddsratio, beta_ratio, hdi, transformed_dist
from .graphs import init_fonts, HorizontalEffectPlot
from .io import pickle_read_compressed, pickle_read_encrypted, pickle_write_compressed, pickle_write_encrypted
from .regress import GLM, IntervalCensoredCox, Logit, OLS, OrdinalLogit, PenalisedLogit, Poisson, regress, vif
from .regress import Cox, GLM, IntervalCensoredCox, Logit, OLS, OrdinalLogit, PenalisedLogit, Poisson, regress, vif
from .sig_tests import anova_oneway, auto_univariable, chi2, mannwhitney, pearsonr, spearman, ttest_ind, ttest_ind_multiple
from .survival import kaplanmeier, logrank, turnbull
from .utils import as_ordinal

View File

@ -101,7 +101,7 @@ def regress(
model_class, df, dep, formula,
*,
nan_policy='warn',
exposure=None, family=None,
exposure=None, family=None, status=None,
method=None, maxiter=None, start_params=None, tolerance=None,
reduced=None,
bool_baselevels=False, exp=None
@ -123,6 +123,8 @@ def regress(
:type exposure: str
:param family: See :class:`yli.GLM`
:type family: str
:param status: See :class:`yli.Cox`
:type status: str
:param method: See statsmodels *model.fit*
:param maxiter: See statsmodels *model.fit*
:param start_params: See statsmodels *model.fit*
@ -141,15 +143,17 @@ def regress(
if not any(x.__name__ == 'RegressionModel' for x in model_class.__bases__):
raise ValueError('model_class must be a RegressionModel')
df_ref = weakref.ref(df)
dmatrices, dep_categories = df_to_dmatrices(df, dep, formula, nan_policy)
# Additional columns to check for NaN - exposure, status, etc.
additional_columns = []
# Build function call arguments
fit_kwargs = {}
if exposure is not None:
fit_kwargs['exposure'] = exposure
additional_columns.append(exposure)
if family is not None:
fit_kwargs['family'] = family
if status is not None:
additional_columns.append(status)
if method is not None:
fit_kwargs['method'] = method
if maxiter is not None:
@ -161,6 +165,16 @@ def regress(
if reduced is not None:
fit_kwargs['reduced'] = reduced
# Preprocess data, check for NaN and get design matrices
df_ref = weakref.ref(df)
df_clean, dmatrices, dep_categories = df_to_dmatrices(df, dep, formula, nan_policy, additional_columns)
# Add function call arguments for supplemental columns - exposure, status, etc.
if exposure is not None:
fit_kwargs['exposure'] = df_clean[exposure]
if status is not None:
fit_kwargs['status'] = df_clean[status]
# Fit model
result = model_class.fit(dmatrices[0], dmatrices[1], **fit_kwargs)
@ -197,7 +211,7 @@ def regress(
result.terms[raw_name].categories[term] = cutoff_term
else:
# Parse if required
factor, column, contrast = parse_patsy_term(formula, df, raw_name)
factor, column, contrast = parse_patsy_term(formula, df_clean, raw_name)
if contrast is not None:
# Categorical term
@ -208,7 +222,7 @@ def regress(
else:
# Add a new categorical term if not exists
if column not in result.terms:
ref_category = formula_factor_ref_category(formula, df, factor)
ref_category = formula_factor_ref_category(formula, df_clean, factor)
result.terms[column] = CategoricalTerm({}, ref_category)
result.terms[column].categories[contrast] = raw_term
@ -218,10 +232,13 @@ def regress(
return result
def df_to_dmatrices(df, dep, formula, nan_policy):
def df_to_dmatrices(df, dep, formula, nan_policy, additional_columns=[]):
# Check for/clean NaNs in input columns
columns = cols_for_formula(dep, df) + cols_for_formula(formula, df)
if additional_columns:
columns += additional_columns
df = df[columns]
df = check_nan(df, nan_policy)
@ -237,7 +254,7 @@ def df_to_dmatrices(df, dep, formula, nan_policy):
# Construct design matrix from formula
dmatrices = patsy.dmatrices(dep + ' ~ ' + formula, df, return_type='dataframe')
return dmatrices, dep_categories
return df, dmatrices, dep_categories
class RegressionModel:
# TODO: Documentation
@ -630,19 +647,55 @@ class CategoricalTerm:
#: Name of the reference category (*str*)
self.ref_category = ref_category
def raw_terms_from_statsmodels_result(raw_result):
def raw_terms_from_statsmodels_result(raw_result, *, wrapped=True):
if wrapped:
zipped_iter = zip(raw_result.model.exog_names, raw_result.params.values, raw_result.conf_int(config.alpha).values, raw_result.pvalues.values)
else:
zipped_iter = zip(raw_result.model.exog_names, raw_result.params, raw_result.conf_int(config.alpha), raw_result.pvalues)
return {
('(Intercept)' if raw_name == 'Intercept' else raw_name): SingleTerm(
raw_name=raw_name,
beta=Estimate(raw_param, raw_ci[0], raw_ci[1]),
pvalue=raw_p
)
for raw_name, raw_param, raw_ci, raw_p in zip(raw_result.model.exog_names, raw_result.params.values, raw_result.conf_int(config.alpha).values, raw_result.pvalues.values)
for raw_name, raw_param, raw_ci, raw_p in zipped_iter
}
# ------------------------
# Concrete implementations
class Cox(RegressionModel):
# TODO: Documentation
@property
def model_long_name(self):
return 'Cox Regression'
@property
def model_short_name(self):
return 'Cox'
@classmethod
def fit(cls, data_dep, data_ind, *, status=None):
# Drop explicit intercept term
if 'Intercept' in data_ind:
del data_ind['Intercept']
result = cls()
result.exp = True
result.cov_type = 'nonrobust'
# Perform regression
raw_result = sm.PHReg(endog=data_dep, exog=data_ind, status=status, missing='raise').fit(disp=False)
result.ll_model = raw_result.llf
result.terms = raw_terms_from_statsmodels_result(raw_result, wrapped=False)
result.vcov = raw_result.cov_params()
return result
class GLM(RegressionModel):
# TODO: Documentation
@ -1093,7 +1146,7 @@ class OrdinalLogit(RegressionModel):
dep = self.dep
# Precompute design matrices
dmatrices, dep_categories = df_to_dmatrices(df, dep, self.formula, 'omit')
_, dmatrices, dep_categories = df_to_dmatrices(df, dep, self.formula, 'omit')
s_dep = dmatrices[0][dmatrices[0].columns[0]] # df[dep] as series - must get this from dmatrices to account for as_numeric
dmatrix_right = dmatrices[1]
dmatrix_right.reset_index(drop=True, inplace=True) # Otherwise this confuses matrix multiplication