Source code for bioneuralnet.clustering.louvain

r"""
Standard Louvain Method for Community Detection - NumPy Implementation.

References:
    Blondel et al. (2008), "Fast unfolding of communities in large networks."
"""

import logging
from typing import Any, Dict, List, Optional, Set, Tuple

import numpy as np
import networkx as nx

logger = logging.getLogger(__name__)

[docs] class Louvain: """ Standard Louvain community detection. This class encapsulates the multi-phase optimization algorithm for detecting communities in weighted graphs. """ def __init__( self, G: nx.Graph, weight: str = "weight", max_passes: int = 100, min_delta: float = 1e-10, seed: Optional[int] = None, ): """ Initializes the Louvain algorithm with the graph and optimization parameters. Sets up the internal adjacency structures and seeds the random number generator if specified. Args: G (nx.Graph): The input weighted undirected NetworkX graph to be analyzed. weight (str): The edge attribute name representing weights to use for the adjacency matrix. max_passes (int): The maximum number of full-node sweeps allowed per local optimization phase. min_delta (float): The minimum modularity gain required to accept a node movement. seed (Optional[int]): A random seed used for reproducibility of the node traversal order. """ self.weight_attr = weight self.max_passes = max_passes self.min_delta = min_delta if seed is not None: np.random.seed(seed) self.nodes: List[Any] = sorted(G.nodes()) self.n: int = len(self.nodes) self.node_to_idx = {nd: i for i, nd in enumerate(self.nodes)} self.idx_to_node = {i: nd for i, nd in enumerate(self.nodes)} A = np.zeros((self.n, self.n), dtype=np.float64) for u, v, data in G.edges(data=True): w = float(data.get(weight, 1.0)) i, j = self.node_to_idx[u], self.node_to_idx[v] if i == j: A[i, i] = w else: A[i, j] = w A[j, i] = w self.A_orig = A self._partition: Optional[Dict[Any, int]] = None self._modularity: Optional[float] = None self._history: List[Dict[str, Any]] = [] logger.info(f"Louvain: {self.n} nodes, " f"{G.number_of_edges()} edges") @staticmethod def _delta_Q( node: int, target_comm: int, community: np.ndarray, A: np.ndarray, k: np.ndarray, m: float, ) -> float: """ Calculates the modularity gain (DeltaQ) for placing an isolated node into a target community. This uses the formula DeltaQ = k_{v,in} / m - Sigma_tot * k_v / (2 * m^2) where terms are computed excluding the node itself. Args: node (int): The integer index of the node being considered for movement. target_comm (int): The integer ID of the candidate community. community (np.ndarray): An array mapping every node index to its current community ID. A (np.ndarray): The adjacency matrix representing the graph structure and weights. k (np.ndarray): An array containing the weighted degree (strength) of each node. m (float): The total weight of all edges in the graph. Returns: float: The calculated change in modularity if the node were added to the target community. """ mask = (community == target_comm) mask = mask.copy() mask[node] = False k_v_in = A[node][mask].sum() sigma_tot = k[mask].sum() k_v = k[node] return k_v_in / m - sigma_tot * k_v / (2.0 * m * m) @staticmethod def _phase1( A: np.ndarray, k: np.ndarray, m: float, community: np.ndarray, max_passes: int, min_delta: float, ) -> Tuple[np.ndarray, int]: """ Executes the local optimization phase by moving nodes between communities to maximize modularity. Iteratively evaluates moving each node to neighbor communities and applies the move that yields the highest positive net gain. Args: A (np.ndarray): The current adjacency matrix of the graph or super-graph. k (np.ndarray): An array of node degrees corresponding to the current adjacency matrix. m (float): The total edge weight of the network. community (np.ndarray): The current community assignment for each node. max_passes (int): The limit on the number of iterations over all nodes. min_delta (float): The threshold for modularity improvement required to trigger a move. Returns: Tuple[np.ndarray, int]: A tuple containing the updated community assignments and the total number of moves performed. """ n = A.shape[0] total_moves = 0 improved = True pass_num = 0 while improved and pass_num < max_passes: improved = False pass_num += 1 order = np.random.permutation(n) for node in order: cur_comm = int(community[node]) nbr_idx = np.nonzero(A[node])[0] if len(nbr_idx) == 0: continue candidate_comms = set(int(c) for c in community[nbr_idx]) candidate_comms.add(cur_comm) best_dQ = 0.0 best_comm = cur_comm stay_dQ = Louvain._delta_Q( node, cur_comm, community, A, k, m ) for c in candidate_comms: if c == cur_comm: continue insert_dQ = Louvain._delta_Q( node, c, community, A, k, m ) net = insert_dQ - stay_dQ if net > best_dQ + min_delta: best_dQ = net best_comm = c if best_comm != cur_comm: community[node] = best_comm improved = True total_moves += 1 logger.debug(f" Phase 1: {pass_num} passes, {total_moves} moves") return community, total_moves @staticmethod def _phase2( A: np.ndarray, community: np.ndarray, n2o: Dict[int, Set[int]], ) -> Tuple[np.ndarray, np.ndarray, float, np.ndarray, Dict[int, Set[int]]]: """ Aggregates the graph based on the community structure found in Phase 1 to create super-nodes. Compresses the adjacency matrix such that new nodes represent previous communities and edge weights are summed. Args: A (np.ndarray): The adjacency matrix of the graph before aggregation. community (np.ndarray): The community assignments for the nodes in the current graph. n2o (Dict[int, Set[int]]): A mapping from current node indices to the set of original node indices they represent. Returns: Tuple[np.ndarray, np.ndarray, float, np.ndarray, Dict[int, Set[int]]]: A tuple containing the new adjacency matrix, degrees, total weight, initial community vector, and updated node mapping. """ unique_comms = np.unique(community) n_new = len(unique_comms) comm_map = {int(c): i for i, c in enumerate(unique_comms)} n_old = A.shape[0] mapping = np.array([comm_map[int(community[i])] for i in range(n_old)]) M = np.zeros((n_old, n_new), dtype=np.float64) M[np.arange(n_old), mapping] = 1.0 new_A = M.T @ A @ M new_k = new_A.sum(axis=1) new_m = new_A.sum() / 2.0 new_n2o: Dict[int, Set[int]] = {} for old_idx in range(n_old): new_idx = mapping[old_idx] new_n2o.setdefault(new_idx, set()).update(n2o[old_idx]) new_community = np.arange(n_new, dtype=np.int64) logger.debug(f" Phase 2: {n_old} -> {n_new} super-nodes") return new_A, new_k, new_m, new_community, new_n2o
[docs] def run(self) -> Dict[Any, int]: """ Executes the full Louvain algorithm by alternating between local optimization and graph aggregation. Loops until the modularity converges or the graph cannot be aggregated further. Returns: Dict[Any, int]: A dictionary mapping original node identifiers to their final community IDs. """ self._history.clear() A = self.A_orig.copy() k = A.sum(axis=1) m = A.sum() / 2.0 community = np.arange(self.n, dtype=np.int64) n2o: Dict[int, Set[int]] = {i: {i} for i in range(self.n)} level = 0 while True: level += 1 logger.info(f"Level {level}: {A.shape[0]} nodes") community, moves = self._phase1( A, k, m, community, self.max_passes, self.min_delta ) n_comms = len(np.unique(community)) self._history.append({ "level": level, "nodes_at_level": A.shape[0], "moves": moves, "communities": n_comms, }) if moves == 0 or n_comms >= A.shape[0]: break A, k, m, community, n2o = self._phase2(A, community, n2o) if A.shape[0] <= 1: break partition: Dict[Any, int] = {} for super_idx in range(len(community)): cid = int(community[super_idx]) for orig_idx in n2o[super_idx]: partition[self.idx_to_node[orig_idx]] = cid remap = {old: new for new, old in enumerate(sorted(set(partition.values())))} self._partition = {nd: remap[c] for nd, c in partition.items()} self._modularity = self._compute_modularity() logger.info( f"Louvain done: {len(remap)} communities, " f"Q = {self._modularity:.6f}, levels = {level}" ) return self._partition
def _compute_modularity(self) -> float: """ Computes the final modularity score (Q) of the partition on the original graph. This serves as a verification step ensuring the result aligns with the standard modularity definition. Returns: float: The modularity score of the final partition. """ if self._partition is None: return 0.0 A = self.A_orig k = A.sum(axis=1) m = A.sum() / 2.0 if m == 0: return 0.0 comm = np.array([self._partition[self.idx_to_node[i]] for i in range(self.n)]) Q = 0.0 for i in range(self.n): for j in range(self.n): if comm[i] == comm[j]: Q += A[i, j] - k[i] * k[j] / (2.0 * m) return Q / (2.0 * m) @property def partition(self) -> Dict[Any, int]: """ Retrieves the final partition of the graph. Requires that the run() method has been executed previously. Returns: Dict[Any, int]: A dictionary mapping nodes to community IDs. """ if self._partition is None: raise ValueError("Call run() first.") return self._partition @property def communities(self) -> Dict[int, List[Any]]: """ Retrieves the communities grouped by community ID. Convenient for iterating over sets of nodes belonging to the same community. Returns: Dict[int, List[Any]]: A dictionary mapping community IDs to lists of nodes. """ d: Dict[int, List[Any]] = {} for nd, cid in self.partition.items(): d.setdefault(cid, []).append(nd) return d @property def modularity(self) -> float: """ Retrieves the final modularity score of the computed partition. Requires that the run() method has been executed previously. Returns: float: The modularity score. """ if self._modularity is None: raise ValueError("Call run() first.") return self._modularity @property def history(self) -> List[Dict[str, Any]]: """ Retrieves the history of the algorithm's execution levels. Provides insight into the convergence process and reduction of graph size. Returns: List[Dict[str, Any]]: A list of dictionaries containing stats for each level. """ return list(self._history)