Source code for bioneuralnet.downstream_task.subject_representation

import pandas as pd
import os
import json
import tempfile
from typing import Optional, Dict, Any, List, Union, Sequence
from pathlib import Path
from datetime import datetime

try:
    import torch
    import torch.nn as nn
    import torch.optim as optim
except ModuleNotFoundError:
    raise ImportError(
        "SubjectRepresentation requires PyTorch. "
        "Please install it by following the instructions at: "
        "https://bioneuralnet.readthedocs.io/en/latest/installation.html"
    )

from ray import tune
from ray.tune import TuneError
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import accuracy_score, r2_score, mean_squared_error

from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split

from ..utils import get_logger
logger = get_logger(__name__)

[docs] class SubjectRepresentation: """SubjectRepresentation Class for Integrating Network Embeddings into Omics Data. This class integrates network-derived embeddings with raw omics data to create enriched subject-level profiles. It supports dimensionality reduction of embeddings (via Autoencoders or other methods) and subsequent fusion with original omics features. Attributes: omics_data (pd.DataFrame): DataFrame of omics features (columns). embeddings (pd.DataFrame): DataFrame with embeddings (indexed by feature names). phenotype_data (Optional[pd.DataFrame]): Optional DataFrame with phenotype labels. phenotype_col (str): Name of the phenotype column. reduce_method (str): Method used for dimensionality reduction (e.g., "AE"). seed (Optional[int]): Random seed for reproducibility. tune (bool): Whether to run hyperparameter tuning. output_dir (Path): Directory where results are written. """ def __init__( self, omics_data: pd.DataFrame, embeddings: pd.DataFrame, phenotype_data: Optional[pd.DataFrame] = None, phenotype_col: str = "phenotype", reduce_method: str = "AE", seed: Optional[int] = None, tune: Optional[bool] = False, output_dir: Optional[Union[str, Path]] = None, ): """Initializes the SubjectRepresentation instance. Args: omics_data (pd.DataFrame): DataFrame of omics features (columns). embeddings (pd.DataFrame): DataFrame with embeddings (indexed by feature names). phenotype_data (pd.DataFrame | None): Optional DataFrame with phenotype labels. phenotype_col (str): Name of the phenotype column. Default is "phenotype". reduce_method (str): Dimensionality reduction method (e.g., "AE"). Default is "AE". seed (int | None): Random seed for reproducibility. tune (bool | None): Whether to run hyperparameter tuning. Default is False. output_dir (str | Path | None): Directory to write results. If None, a temp directory is used. """ logger.info("Initializing SubjectRepresentation with provided data inputs.") if omics_data is None or omics_data.empty: raise ValueError("Omics data must be non-empty.") if embeddings is None: logger.warning("No embeddings provided, please review documentation to see how to generate embeddings.") raise ValueError("Embeddings must be non-empty.") if not isinstance(embeddings, pd.DataFrame): raise ValueError("Embeddings must be provided as a pandas DataFrame.") if embeddings.empty: logger.warning("No embeddings provided, please review documentation to see how to generate embeddings.") raise ValueError("Embeddings must be non-empty.") if tune and phenotype_data is None: raise ValueError("Phenotype data must be provided for classification-based tuning.") if seed is not None: torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False self.seed = seed logger.info(f"Seed set to {self.seed}.") self.omics_data = omics_data.copy(deep=True) self.embeddings = embeddings.copy(deep=True) self.phenotype_data = phenotype_data self.phenotype_col = phenotype_col self.reduce_method = reduce_method.upper() self.tune = tune # match the embedding index embeddings_features = set(self.embeddings.index) omics_features = set(self.omics_data.columns) if len(embeddings_features) != len(omics_features): raise ValueError( f"Number of features in embeddings and omics data do not match.\n" f"Embeddings: {self.embeddings.shape} and Omics: {self.omics_data.shape}" ) common_features = embeddings_features.intersection(omics_features) if len(common_features) == 0: raise ValueError( f"No common features found between the embeddings and omics data.\n" f"Embeddings: {self.embeddings.shape} and Omics: {self.omics_data.shape}" ) logger.info(f"Found {len(common_features)} common features between network and omics data.") # output directory if output_dir is None: self.temp_dir_obj = tempfile.TemporaryDirectory() self.output_dir = Path(self.temp_dir_obj.name) logger.info(f"No output_dir provided; using temporary directory: {self.output_dir}") else: self.output_dir = Path(output_dir) logger.info(f"Output directory set to: {self.output_dir}") self.output_dir.mkdir(parents=True, exist_ok=True)
[docs] def run(self) -> pd.DataFrame: """Executes the Subject Representation workflow. If tuning is enabled, runs hyperparameter tuning and uses the best config to reduce embeddings. Otherwise, uses the default reduction method. Returns: pd.DataFrame: Enhanced omics data as a DataFrame. """ logger.info("Starting Subject Representation workflow.") if self.embeddings.empty: logger.warning( "No embeddings provided. Please generate embeddings using GNNEmbeddings class.\nReturning original omics_data." ) return self.omics_data try: if self.tune: best_config = self._run_tuning() logger.info(f"Best tuning config selected: {best_config}") ae_params = best_config.get("ae_params", {"epochs": 16, "hidden_dim": 8, "lr": 1e-3, "dropout": 0.2, "activation": "relu"}) reduced = self._reduce_embeddings(method=best_config["method"], ae_params=ae_params, compressed_dim=best_config["compressed_dim"]) enhanced_omics_data = self._integrate_embeddings(reduced=reduced, method=best_config["integration_method"], alpha=best_config["alpha"], beta=best_config["beta"]) else: method = self.reduce_method.upper() ae_params_def = {"epochs": 16, "hidden_dim": 8, "lr": 1e-3, "dropout": 0.2, "activation": "relu"} reduced = self._reduce_embeddings(method=method, ae_params=ae_params_def, compressed_dim=2) enhanced_omics_data = self._integrate_embeddings(reduced, method="multiply", alpha=1.0, beta=0.8) if enhanced_omics_data.empty: logger.warning("Enhanced omics data is empty. Returning original omics_data.") return self.omics_data logger.info(f"Subject Representation completed successfully. Final shape: {enhanced_omics_data.shape}") return enhanced_omics_data except Exception as e: logger.error(f"Error in Subject Representation workflow: {e}") raise
def _reduce_embeddings(self, method: str, ae_params: Optional[dict[Any, Any]] = None, compressed_dim: int = 2) -> pd.DataFrame: """Reduces the dimensionality of embeddings using PCA or AutoEncoder. Args: method (str): Reduction method ("PCA" or "AE"). ae_params (dict | None): AutoEncoder hyperparameters (epochs, hidden_dim, lr, dropout). compressed_dim (int): Target number of dimensions. Returns: pd.DataFrame: Reduced embeddings with `compressed_dim` columns. """ logger.info(f"Reducing embeddings using method='{method}' with compressed_dim={compressed_dim}.") if self.embeddings.empty: raise ValueError("Embeddings DataFrame is empty.") method = method.lower() if method == "pca": pca = PCA(n_components=compressed_dim) pcs = pca.fit_transform(self.embeddings) columns = [] for i in range(compressed_dim): columns.append(f"PC{i+1}") reduced_df = pd.DataFrame(pcs, index=self.embeddings.index, columns=columns) logger.info(f"PCA reduction completed. Variance explained: {pca.explained_variance_ratio_.sum():.4f}") elif method in ["ae"]: logger.info("Using autoencoder for reduction.") ae_params = ae_params or { "epochs": 64, "hidden_dim": 8, "lr": 1e-3, "dropout": 0.2, "activation": "relu", } X = torch.tensor(self.embeddings.values, dtype=torch.float) model = AutoEncoder( input_dim=self.embeddings.shape[1], hidden_dims=ae_params["hidden_dim"], compressed_dim=compressed_dim, dropout=ae_params["dropout"], activation=ae_params["activation"] ) optimizer = optim.Adam(model.parameters(), lr=ae_params["lr"]) loss_fn = nn.MSELoss() model.train() for epoch in range(ae_params["epochs"]): optimizer.zero_grad() z, recon = model(X) loss = loss_fn(recon, X) loss.backward() optimizer.step() if (epoch + 1) % max(1, ae_params["epochs"] // 5) == 0: logger.info(f"AE Epoch {epoch+1}/{ae_params['epochs']} - Loss: {loss.item():.4f}") model.eval() with torch.no_grad(): z, _ = model(X) z_np = z.detach().cpu().numpy() if z_np.ndim == 1: z_np = z_np.reshape(-1, 1) columns = [] for i in range(compressed_dim): columns.append(f"AE_{i+1}") reduced_df = pd.DataFrame(z_np, index=self.embeddings.index, columns=columns) logger.info("Autoencoder reduction completed") else: raise ValueError(f"Unsupported reduction method:{method}") return reduced_df def _integrate_embeddings(self, reduced: pd.DataFrame, method="multiply", alpha=2.0, beta=0.5) -> pd.DataFrame: """Integrates reduced embeddings into omics data using a weighted multiplicative approach. Formula: `enhanced = beta * raw + (1 - beta) * (alpha * normalized_weight * raw)` Args: reduced (pd.DataFrame | pd.Series): Reduced embedding weights. method (str): Integration strategy (currently only "multiply"). alpha (float): Scaling factor for the weighted term. beta (float): Weight applied to the original raw data (0 to 1). Returns: pd.DataFrame: Enhanced omics data. """ if not isinstance(reduced, (pd.DataFrame, pd.Series)): raise ValueError("Reduced embeddings must be a pandas DataFrame or Series.") missing_features = set(self.omics_data.columns) - set(reduced.index) if missing_features: logger.warning(f"Missing {len(missing_features)} features in reduced embeddings: {list(missing_features)[:5]}") if method == "multiply": # the reduced embeddings to match the omics data columns. if not reduced.index.equals(self.omics_data.columns): logger.info("Aligning reduced embeddings index with omics_data columns.") reduced = reduced.reindex(self.omics_data.columns) # normalize the embeddings and compute a weight series if isinstance(reduced, pd.DataFrame): for col in reduced.columns: reduced[col] = (reduced[col] - reduced[col].min()) / (reduced[col].max() - reduced[col].min()) weight_series = reduced.mean(axis=1) elif isinstance(reduced, pd.Series): weight_series = (reduced - reduced.min()) / (reduced.max() - reduced.min()) weight_series = weight_series.fillna(0) ranks = weight_series.rank(method="average") scaled_ranks = 2 * (ranks - ranks.min()) / (ranks.max() - ranks.min()) - 1 # look for duplicate feature names. if not scaled_ranks.index.is_unique: logger.error("Duplicate feature names detected in the reduced embeddings index.") # re-index to ensure we have a weight per omics_data feature. feature_weights = scaled_ranks.reindex(self.omics_data.columns,fill_value=0.0) enhanced = self.omics_data.copy() for feature in self.omics_data.columns: weight_val = feature_weights.get(feature) # checking weight_val is series or scalar if isinstance(weight_val, pd.Series): weight_val = weight_val.iloc[0] if pd.notnull(weight_val): enhanced[feature] = beta * enhanced[feature] + (1 - beta) * (alpha * weight_val * enhanced[feature]) else: logger.warning(f"Feature {feature} not found in the reduced weights.") else: #Curently only supoort one method but left the parameter in case we supp0ort more in the future. raise ValueError(f"Unknown integration method: {method}.") logger.info(f"Final Enhanced Omics Shape: {enhanced.shape}") return enhanced def _run_tuning(self) -> Dict[str, Any]: """Orchestrates the hyperparameter tuning process. Returns: dict: The best configuration found by Ray Tune. """ logger.info("Running tuning for SubjectRepresentation.") return self._run_classification_tuning() def _run_classification_tuning(self) -> Dict[str, Any]: """Executes Ray Tune to find optimal reduction and integration parameters. Uses RandomForest (Classifier or Regressor) to evaluate the quality of the enhanced subject representations against the phenotype. Returns: dict: Best hyperparameter configuration. """ search_config = { "method": tune.choice(["PCA", "AE"]), "compressed_dim": tune.choice([1, 2, 3, 4]), "ae_params": { "epochs": tune.choice([64, 128, 256, 512, 1024]), "hidden_dim": tune.choice([16, 32, 64, 128, 256, 512]), "dropout": tune.choice([0.1, 0.2, 0.3, 0.4, 0.5]), "lr": tune.choice([1e-3, 5e-4, 1e-4, 1e-5]), "activation": tune.choice(["relu", "tanh", "sigmoid"]), }, "integration_method": tune.choice(["multiply"]), "alpha": tune.choice([1.5, 2.0, 2.5, 3.0]), "beta": tune.choice([0.1, 0.3, 0.5, 0.7]), } def tune_helper(config): try: method = config["method"].upper() ae_params = config["ae_params"] alpha = config["alpha"] beta = config["beta"] compressed_dim = config["compressed_dim"] reduced = self._reduce_embeddings(method=method, compressed_dim=compressed_dim,ae_params=ae_params) enhanced = self._integrate_embeddings(reduced, method=config["integration_method"], alpha=alpha, beta=beta) common_index = enhanced.index.intersection(self.phenotype_data.index) X = enhanced.loc[common_index].values y = self.phenotype_data.loc[common_index, self.phenotype_col] is_classification = y.dtype != float and y.nunique() <= 20 X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3) if is_classification: model = RandomForestClassifier() model.fit(X_train, y_train.astype(int)) y_pred = model.predict(X_test) score = accuracy_score(y_test, y_pred) else: model = RandomForestRegressor() model.fit(X_train, y_train) y_pred = model.predict(X_test) mse = mean_squared_error(y_test, y_pred) score = -mse tune.report({"score": score}) except Exception as e: logger.error(f"Tuning trial failed: {e}") import traceback traceback.print_exc() tune.report({"score": 0.0}) scheduler = ASHAScheduler(metric="score", mode="max", grace_period=1, reduction_factor=2) reporter = CLIReporter(metric_columns=["score", "training_iteration"]) def short_dirname_creator(trial): return f"_{trial.trial_id}" resources = {"cpu": 1, "gpu": 1} if torch.cuda.is_available() else {"cpu": 1, "gpu": 0} try: analysis = tune.run( tune_helper, config=search_config, num_samples=20, scheduler=scheduler, progress_reporter=reporter, storage_path=os.path.expanduser("~/sr"), trial_dirname_creator=short_dirname_creator, resources_per_trial=resources, name="t_sr", verbose=1, ) except TuneError as e: logger.warning(f"Tuning completed with failures: {e}") analysis = None if len(e.args) > 1 and hasattr(e.args[1], "get_best_trial"): analysis = e.args[1] best_trial = None best_score = 0.0 if analysis and hasattr(analysis, "get_best_trial"): try: best_trial = analysis.get_best_trial("score", "max", "last") best_score = best_trial.last_result.get("score", 0.0) except Exception as e: logger.error(f"Could not retrieve best trial details: {e}") else: logger.error("Analysis object is invalid or missing get_best_trial().") logger.info(f"Best trial final score: {best_score}") if best_trial: timestamp = datetime.now().strftime("%m%d_%H_%M_%S") best_params_file = Path(self.output_dir) / f"Subjects_tune_{timestamp}.json" with open(best_params_file, "w") as f: json.dump(best_trial.config, f, indent=4) logger.info(f"Best Graph Embedding parameters saved to {best_params_file}") else: logger.info("No valid best trial config found; skipping save.") return best_trial.config if best_trial else {}
[docs] def get_activation(activation: str): """Maps a string to a PyTorch activation module.""" activation = activation.lower() if activation == "relu": return nn.ReLU() elif activation == "tanh": return nn.Tanh() elif activation == "sigmoid": return nn.Sigmoid() else: raise ValueError(f"Unsupported activation function: {activation}")
[docs] def generate_hidden_dims(init_dim: int, min_dim: int = 2) -> List[int]: """ Generates a list of hidden dimensions by halving the initial dimension until the minimum is reached. For example, if init_dim is 64, this returns [64, 32, 16, 8, 4, 2]. """ dims = [] current = init_dim while current >= min_dim: dims.append(current) current = current // 2 return dims
[docs] class AutoEncoder(nn.Module): """ Generic Autoencoder for configurable reduction. Builds encoder and decoder layers based on a list of hidden dimensions. Allows tuning of dropout, activation, and network architecture. """ def __init__(self, input_dim: int, hidden_dims: Union[int, Sequence[int]] = 64, compressed_dim: int = 1,dropout: float = 0.0, activation: str = "relu"): super(AutoEncoder, self).__init__() self.activation = get_activation(activation) if isinstance(hidden_dims, (int, float)): hidden_dims = generate_hidden_dims(int(hidden_dims)) elif not isinstance(hidden_dims, list): raise ValueError("hidden_dims must be an int, float, or a list of ints.") # encoder: encoder_layers = [] current_dim = input_dim for h_dim in hidden_dims: encoder_layers.append(nn.Linear(current_dim, h_dim)) encoder_layers.append(self.activation) encoder_layers.append(nn.Dropout(dropout)) current_dim = h_dim encoder_layers.append(nn.Linear(current_dim, compressed_dim)) # the * operator unpacks the list into separate arguments self.encoder = nn.Sequential(*encoder_layers) # decoder: decoder_layers = [] current_dim = compressed_dim for h_dim in reversed(hidden_dims): decoder_layers.append(nn.Linear(current_dim, h_dim)) decoder_layers.append(self.activation) decoder_layers.append(nn.Dropout(dropout)) current_dim = h_dim decoder_layers.append(nn.Linear(current_dim, input_dim)) self.decoder = nn.Sequential(*decoder_layers)
[docs] def forward(self, x): z = self.encoder(x) recon = self.decoder(z) return z, recon