Source code for pymantra.network.ld_estimation

"""
General options for improvement:
* different linear models (incl. regularized ones)
* different ways of correcting
"""
import sys
import warnings
import numpy as np
import pandas as pd
import networkx as nx
from tqdm import tqdm
from scipy.stats import f
from typing import Tuple, Dict, List, NamedTuple, Union, Callable
from sklearn.base import RegressorMixin
from sklearn.preprocessing import StandardScaler
# TODO: allow for ridge, lasso and/or elastic net?
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from scipy.stats import ttest_ind, wilcoxon, kstest, norm
from multiprocessing import Process, Manager
with warnings.catch_warnings() as w:
    warnings.simplefilter("ignore")
    # outdated is a little annoying here
    from pingouin import multivariate_normality, multivariate_ttest
from statsmodels.regression.mixed_linear_model import MixedLM
from statsmodels.tools.sm_exceptions import ConvergenceWarning

from pymantra.network import reaction_graph_extraction
from pymantra.network.exceptions import R2Warning
from pymantra.network.non_parametric_multivariate import multivariate_rank_test


class LinearModel(NamedTuple):
    r"""
    Holding the results of a linear model between substrate
    and product intensities.

    substrates: List[str]
        Name of the substrates of the represented reaction
    products: List[str]
        Name of the products of the represented reaction
    model_fit : np.ndarray
        Normed residuals for all samples
    Model : RegressorMixin
        Underlying regression model
    nll : np.ndarray
        Negative log-likelihood aggregated over all products
        of the respective reaction
    r2 : np.ndarray
        :math:`R^2` per product
    f_statistics : np.ndarray
        f-statistics per product
    ftest_pvalue: np.ndarray
        f-test p-value per product
    active : bool
        Indicating whether the 'threshold' for an active
        reaction was met
    outlier_samples: list
        Outliers removed from the training data
    model_fit_type : str, "unset"
        The type of metric used for "model_fit". In most cases
        this would be "residuals", "normed_residuals" or
        "explained_variance"
    """
    substrates: list
    products: list

    model_fit: np.ndarray
    model: RegressorMixin
    nll: np.ndarray
    r2: np.ndarray
    f_statistics: np.ndarray
    ftest_pvalue: np.ndarray
    active: bool
    outlier_samples: list
    model_fit_type: str = "unset"


def _nll(X, Y, y_hat):
    # sum of squared distances
    ssd = np.sum((Y - y_hat) ** 2, axis=0)
    n_observ = X.shape[0] / 2
    # Negative log-likelihood
    # formula derived from the assumption of a scale of 1 and the log
    # pdf of a normal distribution
    return -n_observ * (np.log(2 * np.pi) + np.log(ssd / X.shape[0]) + 1)


def _r_square(X, Y, y_hat):
    # average residual sum of squares
    var_fit = np.sum((Y - y_hat) ** 2, axis=0) / X.shape[0]
    # average total sum of squares
    var_mean = np.sum((Y - np.mean(Y, axis=0)) ** 2, axis=0) / X.shape[0]
    return (var_mean - var_fit) / var_mean


def _test_explained_variances(var_ctr, var_case, alpha=.05):
    if var_ctr.ndim == 2 and var_ctr.shape[1] > 1:
        # multivariate case
        is_normal = multivariate_normality(var_ctr, alpha).normal and \
                        multivariate_normality(var_case, alpha).normal
        if is_normal:
            return multivariate_ttest(var_ctr, var_case)["p-val"]
        else:
            try:
                # TODO: implement this
                return multivariate_rank_test(var_ctr, var_case).pvalue
            except NotImplementedError:
                warnings.warn(
                    "Non-parametric multivariate test is currently not "
                    "implemented! Defaulting to parametric test, despite "
                    "non-normality."
                )
                return multivariate_ttest(var_ctr, var_case)["p-val"]
    # univariate case
    ctr_normal = kstest(var_ctr, norm.cdf).pvalue
    case_normal = kstest(var_case, norm.cdf).pvalue
    if ctr_normal and case_normal:
        return ttest_ind(var_ctr, var_case, nan_policy="omit")[1]
    else:
        return wilcoxon(var_ctr, var_case, nan_policy="omit"[1])


def _fstats(
    X, Y, y_hat, as_probability: bool = True,
    ssd_simple: np.ndarray = None, p_simple: int = None,
    norm_multiple: bool = False
):
    if ssd_simple is None:
        if norm_multiple:
            dist = np.linalg.norm(Y - np.mean(Y, axis=0), axis=1)
            ssd_simple = np.sum(dist ** 2, axis=0)
        else:
            ssd_simple = np.sum((Y - np.mean(Y, axis=0)) ** 2, axis=0)
        p_simple = 1
    elif p_simple is None:
        raise ValueError(
            "'p_simple' must be given, when 'ssd_simple' is given."
        )
    if norm_multiple:
        dist = np.linalg.norm(Y - y_hat, axis=1)
        ssd_fit = np.sum(dist ** 2, axis=0)
    else:
        ssd_fit = np.sum((Y - y_hat) ** 2, axis=0)
    df1 = X.shape[1] - p_simple
    df2 = X.shape[0] - X.shape[1]
    fstats = ((ssd_simple - ssd_fit) / df1) / (ssd_fit / df2)
    if as_probability:
        # probabilities from f-statistic
        return fstats, f.sf(fstats, df1, df2)
    return fstats


def _cooks_distance(y, yhat, X, p: [int, str] = 2):
    # for details on cooks' distance see
    # https://en.wikipedia.org/wiki/Cook%27s_distance
    leverage_mat = X * np.linalg.pinv(X).T
    leverage = leverage_mat.sum(axis=1)

    # NOTE: rank used instead of number of variables
    df = X.shape[0] - np.linalg.matrix_rank(X)
    res = _normed_residuals(y, yhat, p)
    # square root of MSE
    # NOTE: we use the p-norm here to get one distance per sample
    me = np.sqrt(res.dot(res) / df)
    # final distance calculation
    res_norm = res / (me * np.sqrt(1 - leverage))
    dist = (res_norm ** 2 / X.shape[1]) * (leverage / (1 - leverage))

    return dist, df


def _normed_residuals(y, y_hat, p=2, *args, **kwargs):
    """Compute the p-norm for each residual

    Parameters
    ----------
    y: np.ndarray
        True labels/target variables
    y_hat: np.ndarray
        Predicted labels/target variables
    p: Union[int, float, np.inf]
        p-norm to use
    args
        Just provided for compatibility with other function signatures
    kwargs
        Just provided for compatibility with other function signatures

    Returns
    -------
    np.ndarray
        Normed residuals per sample
    """
    if y.ndim > 1:
        return np.linalg.norm(y_hat - y, ord=p, axis=1) / y.shape[1]
    else:
        return y_hat - y


def _explained_variance(y, y_hat, *args, **kwargs):
    """Compute the explained variance per samples

    The explained variance is computed as the ratio of the residual sum of
    squares and the total sum of squares.

    Parameters
    ----------
    y: np.ndarray
        True labels/target variables
    y_hat: np.ndarray
        Predicted labels/target variables
    args
        Just provided for compatibility with other function signatures
    kwargs
        Just provided for compatibility with other function signatures

    Returns
    -------
    np.ndarray
        Explained variance per sample
    """
    # residual sum of squares
    res_ss = ((y - y_hat) ** 2).sum(axis=1)
    # total sum of squares
    total_ss = ((y - y.mean(axis=0)) ** 2).sum(axis=1)
    # explained variance as ratio between residual and total sum of squares
    expl_var = 1 - (res_ss / (total_ss + 1e-4))
    # preventing negative values since res / total is in (-inf, 1]
    expl_var[expl_var < 1e-4] = 1e-4
    return expl_var


def _compute_model(
    X: np.ndarray, Y: np.ndarray, residual_summary: Callable,
    model: RegressorMixin = None,
    substrates: list = None, products: list = None,
    p_tresh: float = 0.05, p: int = 2, simple_model: RegressorMixin = None,
    simple_data: np.ndarray = None, norm_multiple: bool = False,
    outlier_threshold: float = None, r2_threshold: float = .5,
    r2_warning: Callable = lambda: None, model_class=LinearRegression, **kwargs
) -> Union[LinearModel, None]:
    r"""

    The per-feature negative log-likelihood of the model is computed
    as the concentrated negative log-likelihood

    .. math::
        NLL(\theta) &= - \frac{N}{2} (log(2\pi) + log(\frac{SSR}{N} + 1)

    Parameters
    ----------
    X : np.ndarray
        Data of the explanatory variables. This reflects the substrate data,
        already corrected for all possible covariates.
    Y : np.ndarray
        Data of the target variables. This reflects the product data,
        already corrected for all possible covariates.
    residual_summary: Callable
        Function to compute a residual summary statistic
    model : LinearRegression, optional
        Linear model to use. Any class with a scikit-learn like API
        implementing `fit` and `predict` can be used here.
    substrates : list, optional
        Substrates to subset X by.
    products : list, optional
        Products to subset X by.
    p_tresh : float, default 0.05
        p-value threshold
    p : int, default 2
        lp-norm to use, only relevant when `residual_summary` is the p-norm
        function
    simple_model : LinearRegression, optional
        Simple baseline model to compare the actual model against for p-value
        computation
    simple_data : np.ndarray, optional
        Simple baseline data to compare the actual model against for p-value
        computation
    norm_multiple : bool, default False
        norm multivariable residuals
    outlier_threshold : float | bool, default None
        Cook's distance threshold for outlier detection. If `False`, no outlier
        removal will be performed
    r2_threshold : float, default 0.5
        Minimum :math:`R^2` the model needs to meet to not be
        discarded. If all models should be taken set to None.
    r2_warning: Callable, optional
        Function to raise a warning when the r2-threshold is not passed
    kwargs
        Optional keyword arguments for
        :py:class:`sklearn.linear_model.LinearRegression`

    Returns
    -------
    LinearModel | None
        Fitted linear model and negative log-likelihood as the mean NLL
        over all features. If model r2 restriction is set and not met
        None will be returned.
    """
    if model is None:
        model = model_class(**kwargs)
        model.fit(X, Y)
    # model prediction
    # e.g model.intercept_ + X @ model.coef_.T for a linear regression
    y_hat = model.predict(X)
    # cook's distance for outlier detection
    if isinstance(outlier_threshold, bool) and not outlier_threshold:
        has_outliers = False
    else:
        cook_dist, df = _cooks_distance(Y, y_hat, X, p)
        if outlier_threshold is None:
            # TODO: what's the better threshold here?
            # outlier_threshold = 4 / X.shape[0]
            outlier_threshold = f.sf(cook_dist, X.shape[1], df)
        outliers = cook_dist > outlier_threshold
        has_outliers = np.any(outliers)
    if has_outliers:
        xn = X[~outliers]
        yn = Y[~outliers]
        model.fit(xn, yn)
        y_hat = model.predict(xn)
    else:
        xn = X
        yn = Y
    # checking general model fit
    r2_val = r2_score(yn, y_hat)
    if r2_val < r2_threshold:
        r2_warning(r2_val)
        return
    # per feature nll => 'total' NLL is the mean over all features
    nll = _nll(xn, yn, y_hat)
    # TODO: we need to ensure that all values are above a certain
    #       threshold, since our requirement is that all products are
    #       predictable from the input
    # => compute all individual F-statistics + prob => test
    if simple_model:
        y_simple = simple_model.predict(simple_data[~outliers])
        ssd_simple = np.sum((yn - y_simple) ** 2, axis=0)
        if simple_model.coef_.ndim == 1:
            p_simple = simple_model.coef_.size
        else:
            p_simple = simple_model.coef_.shape[1]
        fstats, pvals = _fstats(xn, yn, y_hat, as_probability=True,
                                ssd_simple=ssd_simple,
                                p_simple=p_simple,
                                norm_multiple=norm_multiple)
        active = np.all(pvals < p_tresh)
    else:
        fstats, pvals = _fstats(xn, yn, y_hat, as_probability=True,
                                norm_multiple=norm_multiple)
        active = np.all(pvals < p_tresh)
    # TODO: additional test: hotelling
    # normed residuals normalized by the number of features
    res = np.zeros(X.shape[0])
    res[~outliers] = residual_summary(yn, y_hat, p=p)
    res[outliers] = np.nan

    return LinearModel(
        substrates, products, res, model, nll, _r_square(xn, yn, y_hat),
        fstats, pvals, active, list(np.where(outliers)[0])
    )


def _ld_models(
    reaction_nodes: Dict[str, Tuple[List[str], List[str]]],
    data: pd.DataFrame, residual_summary: Callable,
    case_data: pd.DataFrame = None, n_threads: int = 1,
    r2_warning: Callable = lambda x: lambda y: None,
    recompute_non_passing: bool = False, **kwargs
) -> Tuple[Dict[str, LinearModel], Dict[str, np.ndarray]]:
    if case_data is not None:
        def _model(
            sub_data, prod_data, reac, subs, prods, model_dict, res, inv=False
        ):
            if len(subs) == len(prods) and np.all(np.isin(subs, prods)):
                return
            lm = _compute_model(
                sub_data, prod_data,
                residual_summary,
                substrates=subs,
                products=prods,
                r2_warning=r2_warning(reac),
                **kwargs
            )
            if not lm:
                if not recompute_non_passing or inv:
                    return
                return _model(
                    case_data.loc[:, subs].values,
                    case_data.loc[:, prods].values,
                    reac, subs, prods, model_dict, res, True
                )
            use_data = data if inv else case_data
            model_dict[reac] = lm
            case_subs = use_data.loc[:, subs].values
            # NOTE: the RegressorMixin base class **always** implements a
            #       `predict` method
            case_pred = lm.model.predict(case_subs)
            normed_res = residual_summary(
                case_pred, use_data.loc[:, prods], p=kwargs.get("p", 2))
            res[reac] = normed_res
    else:
        def _model(sub_data, prod_data, reac, subs, prods, model_dict, _):
            if len(subs) == len(prods) and np.all(np.isin(subs, prods)):
                return
            model = _compute_model(
                sub_data, prod_data,
                residual_summary,
                substrates=subs,
                products=prods,
                r2_warning=r2_warning(reac),
                **kwargs
            )
            if model:
                model_dict[reac] = model
    if n_threads == 1:
        models = {}
        residuals = {}
        for reaction, (substrates, products) in reaction_nodes.items():
            _model(
                data.loc[:, substrates].values,
                data.loc[:, products].values,
                reaction, substrates,
                products, models, residuals
            )
    else:
        # FIXME: setting thread number correctly
        print("parallel option used")
        manager = Manager()
        residuals = manager.dict()
        models = manager.dict()
        # pool = manager.Pool()
        jobs = []
        for reaction, (substrates, products) in reaction_nodes.items():
            substrate_data = data.loc[:, substrates]
            product_data = data.loc[:, products]
            jobs.append(Process(
                target=_model,
                args=(substrate_data, product_data, reaction, substrates,
                      products, models, residuals)
            ))
        _ = []
        # FIXME: correct join operation
    return models, residuals


def _neighbour_models(
    graph: nx.DiGraph, initial_models: Dict[str, LinearModel],
    metabolome_data: pd.DataFrame, **kwargs
) -> Dict[str, LinearModel]:
    # extract reaction-reaction connection
    reaction_graph = reaction_graph_extraction(graph, include_attributes=False)
    # turn into adjacency list
    adjacency_list = {}
    for (src, tgt) in reaction_graph:
        adjacency_list.setdefault(src, set()).add(tgt)
        adjacency_list.setdefault(tgt, set()).add(src)
    neighbour_models = {}
    # compute neighbour-included models
    # TODO: parallelized option
    # TODO: what should we really 'correct' for here:
    #   * metabolites without direction to connection to products but to
    #     substrates
    #   * reaction-chain, i.e. mediating model A -> B -> C
    # TODO: outlier detection
    for reaction, model in initial_models.items():
        reaction_substrates = set(graph.predecessors(reaction))
        reaction_products = list(graph.successors(reaction))
        # NOTE: this cannot raise a KeyError unless there's an
        #       error in the package. In the case of incorrect
        #       node/edge type annotation reaction graph extraction
        #       will already throw an error
        neighbours = adjacency_list.get(reaction)
        # TODO: there have two options here
        #   1. correct data for neighbouring substrate
        #   2. 'joint' linear model and then check for coefficients of reaction
        # also note that we only use reaction substrates, but not the
        # products of adjacent reactions
        adj_substrates = set()
        if neighbours is not None:
            for neighbour in neighbours:
                if initial_models[neighbour].active:
                    adj_substrates.update(set(graph.predecessors(neighbour)))
            substrates = list(adj_substrates.union(reaction_substrates))
            substrate_data = metabolome_data.loc[:, substrates]
            product_data = metabolome_data.loc[:, reaction_products]
            # computing the linear model
            # TODO: pass the initial model (i.e. `model`) and compute
            #       f-statistics and p-value as the residual difference
            #       between the models

            sub_data = metabolome_data.loc[:, list(reaction_substrates)].values
            model = _compute_model(
                substrate_data, product_data,
                substrates=substrates,
                products=reaction_products,
                simple_model=model.model,
                simple_data=sub_data,
                **kwargs
            )
            # storing the results
            neighbour_models[reaction] = model
        else:
            neighbour_models[reaction] = LinearModel(
                [], [], np.array([]), LinearRegression(),
                np.array([]), np.array([]), np.array([]), np.ndarray([]),
                active=model.active, outlier_samples=[]
            )
    return neighbour_models


def confounder_correction(
    metabolome_data: pd.DataFrame, covariates: pd.DataFrame,
    random_effects: Union[str, List[str]] = None, **kwargs
) -> pd.DataFrame:
    """Correct for confounder using linear (mixed effect) models

    Perform a confounder correction by fitting a linear model or linear mixed
    effect model for each metabolite (as the target variable). The residual,
    representing the variance unexplained by the confounding factors, are
    returned as the confounder corrected data.

    For linear mixed effect models, only simple settings without interaction
    between variables are supported. If you have a more complex setup, we
    recommend correcting outside `pymantra` and using the corrected data
    for your analysis.

    Parameters
    ----------
    metabolome_data: pd.DataFrame
        Metabolomics data to correct with samples in rows and features in
        columns
    covariates: pd.DataFrame
        Confounder data with samples in rows and variables in columns
    random_effects: str | List[str]
        Random effect variables. If no random effects should be used (i.e.
        a 'classical' linear model instead of a linear mixed effect model) pass
        None.
    kwargs
        Optional keyword arguments to pass to :py:func:`MixedLM.from_formula`.

    Returns
    -------
    pd.DataFrame
        Confounder corrected data with the same shape, indices and columns as
        `metabolome_data`.
    """
    if random_effects is None:
        corr_model = LinearRegression()
        corr_model.fit(covariates, metabolome_data)
        pred_vals = corr_model.predict(metabolome_data.values)
        corr_residuals = pd.DataFrame(
            metabolome_data.values - pred_vals,
            index=metabolome_data.index, columns=metabolome_data.columns
        )
    else:
        # statsmodel cannot handle variables in formulas, which contain
        # whitespaces. Hence, we change feature names and reset them after
        # correction
        metabolites = metabolome_data.columns
        metabolome_data.columns = [
            f"s{i + 1}" for i in np.arange(metabolome_data.shape[1])]
        # same with covariate names
        covariates.columns = [x.replace(" ", "_") for x in covariates.columns]
        random_effects = [x.replace(" ", "_") for x in random_effects]

        lmm_data = pd.concat([metabolome_data, covariates], axis=1)

        fixed_effects = " + ".join(
            covariates.columns[~np.isin(covariates.columns, random_effects)])

        if isinstance(random_effects, str):
            random_effects = [random_effects]
        # this looks a bit hacky, but is the only way I manged to reproduce the
        # lmer behavior
        if len(random_effects) == 1:
            # single random effect variable
            def lmm(metabolite):
                model = MixedLM.from_formula(
                    f"{metabolite} ~ {fixed_effects}", lmm_data,
                    groups=lmm_data[random_effects[0]], **kwargs
                )
                return model.fit()
        else:
            # multiple independent random effects
            vcf = {
                rand_effect: f"0 + C({rand_effect})"
                for rand_effect in random_effects
            }
            lmm_data["Group"] = 1

            def lmm(metabolite):
                model = MixedLM.from_formula(
                    f"{metabolite} ~ {fixed_effects}", groups="Group",
                    vc_formula=vcf, data=lmm_data, **kwargs
                )
                return model.fit()

        corr_res = np.zeros(metabolome_data.shape)
        # TODO: parallelize and make more efficient
        with warnings.catch_warnings():
            # Parameter is often on the boundary
            warnings.simplefilter("ignore", ConvergenceWarning)
            desc = "Confounder Correction"
            for i, met in enumerate(tqdm(metabolome_data.columns, desc=desc)):
                try:
                    res = lmm(met)
                    corr_res[:, i] = res.resid
                # this is most likely a singular covariance matrix error
                except ValueError as err:
                    tb = sys.exc_info()[2]
                    raise ValueError(
                        f"Error correcting {met}: {err}"
                    ).with_traceback(tb)
        corr_residuals = pd.DataFrame(
            corr_res, index=metabolome_data.index, columns=metabolites)

    return corr_residuals


def _prep_data(
    graph, metabolome_data, covariates, scale, random_effects, **kwargs
):
    reaction_nodes = {}
    for node, ntype in nx.get_node_attributes(graph, 'node_type').items():
        if graph.nodes[node]['node_type'] == 'reaction':
            substrates = [
                sub for sub in graph.predecessors(node)
                if graph.nodes[sub]['node_type'] == 'metabolite'
            ]
            products = [
                prod for prod in graph.successors(node)
                if graph.nodes[prod]['node_type'] == 'metabolite'
            ]
            if substrates and products:
                reaction_nodes[node] = (substrates, products)

    # TODO: check if axis sizes match up
    if covariates is not None:
        corr_residuals = confounder_correction(
            metabolome_data, covariates, random_effects, **kwargs)
    else:
        corr_residuals = metabolome_data

    if scale:
        corr_residuals = pd.DataFrame(
            StandardScaler().fit_transform(corr_residuals.to_numpy()),
            index=corr_residuals.index, columns=corr_residuals.columns
        )

    return corr_residuals, reaction_nodes


def ld_estimation(
    graph: nx.DiGraph, metabolome_data: pd.DataFrame,
    covariates: pd.DataFrame = None, scale: bool = True,
    random_effects: Union[str, List[str]] = None, **kwargs
) -> Tuple[pd.DataFrame, Dict[str, LinearModel], Dict[str, LinearModel]]:
    warnings.warn(
        "'ld_estimation' is currently not supported! Please use "
        "'per_sample_ld_estimation'"
    )
    corr_residuals, reaction_nodes = _prep_data(
        graph, metabolome_data, covariates, scale, random_effects)

    # first step: compute linear models per reaction
    initial_models, _ = _ld_models(reaction_nodes, corr_residuals, **kwargs)
    # second step: including 1-hop neighbours, that are connected via 'active'
    #              reactions
    neighbour_models = _neighbour_models(graph, initial_models, corr_residuals,
                                         **kwargs)
    # extract the reaction values per sample
    initial_values = pd.DataFrame.from_dict(
        {reaction: pd.Series(model.model_fit)
         for reaction, model in initial_models.items()},
        orient="index"
    )
    neighbour_values = pd.DataFrame.from_dict(
        {reaction: pd.Series(model.model_fit)
         for reaction, model in neighbour_models.items()},
        orient="index"
    )
    res_diff = initial_values.abs() - neighbour_values.abs()
    return res_diff, neighbour_models, initial_models


def _scale_1D(x: np.ndarray, params: Tuple[float, float] = None):
    if params is None:
        params = (np.nanmean(x), np.nanstd(x))
        return (x - params[0]) / params[1], params
    return (x - params[0]) / params[1]


[docs]def per_sample_ld_estimation( graph: nx.DiGraph, metabolome_data: pd.DataFrame, groups: pd.Series, covariates: pd.DataFrame = None, compute_expl_var: bool = False, var_as_pval: bool = False, combined_models: bool = False, residual_summary: str = "expl_var", scale: bool = True, control_group: str = None, r2_threshold: float = .5, recompute_non_passing: bool = False, outlier_threshold: float = None, random_effects: Union[str, List[str]] = None, lmm_args: dict = None, verbose: bool = False, **kwargs ) -> Tuple[Dict[str, LinearModel], Dict[str, np.ndarray], Dict[str, pd.Series]]: r"""Compute linear reaction-models TODO Parameters ---------- graph : nx.Graph Reaction-reaction graph metabolome_data : pd.DataFrame Metabolome data with samples in rows and metabolites in columns groups : pd.Series Sample group annotation covariates : pd.DataFrame, optional Confounder variables to correct for. Correction is done using a Linear Mixed Model. All variables (i.e. columns) not specified as random effect variables in `random_effects` are assumed to be fixed effects variables. Generally variables should be numerical (float or integer). If you have categorical data as strings you can use the `pandas.get_dummies` function to encode them as integers. Make sure to use `drop_first` to avoid introducing collinearity see (https://stackoverflow.com/questions/31498390/how-to-get-pandas-get-dummies-to-emit-n-1-variables-to-avoid-collinearity) # noqa: E501 The correction currently only supports simple fixed and random effects inclusion. For more complex setups including factor interaction, it is recommended to do the correction beforehand and only pass the residuals to this function instead of the original metabolome data frame. compute_expl_var : bool, False Whether to return 1 - explained variance of the model or the residuals var_as_pval : bool, False Whether to return a p-value or a residual/explained variance value combined_models : bool, False Whether to compute the reference linear model based on both groups or only the 'control' residual_summary: str, "expl_var" Which method to use as residual summary statistic. Either "expl_var" for explained variance (RSS/TSS) or "norm" for p-norm scale : bool, True Whether to z-score scale metabolites control_group : str, optional Option to set which group should be viewed as the control. If None the first element in `groups` will be used. r2_threshold : float, .5 Minimum :math:`R^2` value a method needs to achieve to be further considered recompute_non_passing: bool, False Whether to recompute models with case data that failed to pass the R2 threshold for control data. outlier_threshold : float, optional Threshold to remove outliers by Cook's distance. If `None` a default on the basis of the survival function of a f-distribution is computed. random_effects: str | List[str], optional Random effects for confounder correction. If `covariates` is None this has no effect. Else, this specifies which column(s) of `covariates` to include as random effects, all other columns will be included as fixed effects. If this is None, all columns of `covariates` are assumed to be fixed effects. lmm_args: dict, optional Keyword arguments for :py:func:`MixedLM.from_formula`. Ignored unless `covariates` and `random_effects` are both not None. verbose: bool, False If True, warnings will be raised whenever a model does not pass the R2 filter kwargs Optional keyword arguments passed to model computation TODO Returns ------- Tuple[Dict[str, LinearModel], Dict[str, np.ndarray], Dict[str, pd.Series]] Control models, case residuals and all scaled residuals per reaction. If `compute_expl_var` is set to True the last element will contain the explained variance instead of the residuals. """ if groups.unique().size != 2: raise ValueError( "Exactly 2 groups must be given!" ) if control_group is None: control_group = groups.unique()[0] group_mask = groups == control_group # data preparation includes # * extracting the substrates and products per reaction # * scaling # * (possibly) confounder correction corr_residuals, reaction_nodes = _prep_data( graph, metabolome_data, covariates, scale, random_effects, **(lmm_args or {}) ) if combined_models: warnings.warn( "Combined group computation currently not implemented") if residual_summary == "expl_var": res_sum = _explained_variance elif residual_summary == "norm": res_sum = _normed_residuals else: raise ValueError( "'residual_summary' must be 'expl_var' or 'norm', not " f"{residual_summary}" ) # print-function lazy loading to avoid too many if-else repeats if verbose: def r2_warn_func(reac: str): def warn(r2: float): warnings.warn( f"{reac} failed to pass R2 with {round(r2, 4)}", category=R2Warning ) return warn else: def r2_warn_func(reac: str): def warn(r2: float): return None return warn # first step: compute linear models per reaction for control samples control_models, case_residuals = _ld_models( reaction_nodes, corr_residuals.loc[group_mask, :], res_sum, case_data=corr_residuals.loc[~group_mask, :], r2_threshold=r2_threshold, r2_warning=r2_warn_func, recompute_non_passing=recompute_non_passing, outlier_threshold=outlier_threshold, **kwargs ) if compute_expl_var: if var_as_pval: def assign_values(variances, var_arr, rea): variances[rea] = _test_explained_variances( var_arr[group_mask], var_arr[~group_mask]) else: def assign_values(variancess, var_arr, rea): variancess[rea] = var_arr # test case samples with multivariate test # => sample-wise explained_variances = {} for reaction, model in control_models.items(): yhat = model.model.predict( corr_residuals.loc[groups.index, model.substrates]) expl_var = _explained_variance( corr_residuals.loc[groups.index, model.products], yhat) assign_values(explained_variances, expl_var, reaction) return control_models, case_residuals, explained_variances # TODO: continue # * z-scores for residuals # (based on p-norm or is there a multidimensional one?) scaled_residuals = {} if var_as_pval: def transform_residuals(rea, mod): ypred = mod.model.predict( corr_residuals.loc[groups.index, model.substrates]) res = res_sum( ypred, corr_residuals.loc[groups.index, model.products], kwargs.get("p", 2) ) scaled_residuals[rea] = _test_explained_variances( res[group_mask], res[~group_mask]) else: def transform_residuals(rea, mod): scaled_control, control_dist = _scale_1D(mod.model_fit) scaled_case = _scale_1D(case_residuals[rea], control_dist) scaled_residuals[rea] = pd.Series( np.hstack((scaled_control, scaled_case)), index=np.hstack((metabolome_data.index[group_mask], metabolome_data.index[~group_mask])), ) # TODO: parallelized option for reaction, model in control_models.items(): transform_residuals(reaction, model) return control_models, case_residuals, scaled_residuals