Pulsar package API¶
- class pulsar.BallMapper(eps)¶
Bases:
objectA fitted Ball Mapper complex.
Ball Mapper decomposes a point cloud into overlapping balls and represents connectivity as a graph. Designed for large-scale EHR data.
- edges¶
- eps¶
- fit(points)¶
Fit the Ball Mapper to a point cloud.
- n_edges()¶
- n_nodes()¶
- nodes¶
- class pulsar.ColumnProfile(name: str, dtype: str, is_numeric: bool, n_unique: int, n_missing: int, missing_pct: float, sample_values: list[str], mean: float | None, std: float | None, min_val: float | None, max_val: float | None, top_values: list[tuple[str, int]] | None)[source]¶
Bases:
objectPer-column metadata for LLM preprocessing decisions.
- dtype: str¶
- is_numeric: bool¶
- max_val: float | None¶
- mean: float | None¶
- min_val: float | None¶
- missing_pct: float¶
- n_missing: int¶
- n_unique: int¶
- name: str¶
- sample_values: list[str]¶
- std: float | None¶
- top_values: list[tuple[str, int]] | None¶
- class pulsar.CosmicGraph¶
Bases:
objectPython-facing Cosmic Graph class.
```python from pulsar._pulsar import CosmicGraph, pseudo_laplacian import numpy as np
# Accumulate pseudo-Laplacians across all ball maps in the sweep galactic_L = np.zeros((n, n), dtype=np.int64) for bm in ball_maps:
galactic_L += pseudo_laplacian(bm.nodes, n)
# Build the Cosmic Graph cg = CosmicGraph.from_pseudo_laplacian(galactic_L, threshold=0.0) print(cg.weighted_adj) # float weights in [0, 1] print(cg.adj) # binary adjacency (uint8) ```
- adj¶
Binary adjacency matrix, shape (n, n), dtype uint8.
Entry (i, j) = 1 iff weighted_adj[i, j] > threshold.
- static from_pseudo_laplacian(l, threshold)¶
Build a Cosmic Graph from an accumulated pseudo-Laplacian matrix.
# Parameters - l (np.ndarray[int64, 2D], shape (n, n)) — summed pseudo-Laplacian
from all Ball Maps in the parameter sweep.
threshold (float) — edges with weight ≤ threshold are excluded from the binary adjacency matrix. Typical value: 0.0.
# Returns A CosmicGraph instance.
- n¶
Number of data points (side length of both adjacency matrices).
- weighted_adj¶
Weighted adjacency matrix, shape (n, n), values in [0, 1].
Entry (i, j) represents normalised co-membership between points i and j across all Ball Maps in the sweep.
- class pulsar.DatasetProfile(n_samples: int, n_features: int, n_columns_total: int, missingness_pct: float, knn_k5_mean: float, knn_k10_mean: float, knn_k20_mean: float, pca_cumulative_variance: list[tuple[int, float]], column_profiles: list[ColumnProfile])[source]¶
Bases:
objectRaw measurements only — no derived decisions.
- column_profiles: list[ColumnProfile]¶
- knn_k10_mean: float¶
- knn_k20_mean: float¶
- knn_k5_mean: float¶
- missingness_pct: float¶
- n_columns_total: int¶
- n_features: int¶
- n_samples: int¶
- pca_cumulative_variance: list[tuple[int, float]]¶
- class pulsar.PCA(n_components, seed, n_oversamples=10, n_power_iter=2)¶
Bases:
objectRandomized PCA optimized for large datasets.
Uses randomized SVD (Halko et al. 2011) which is O(n*d*k) instead of O(n*d² + d³) for exact SVD. Different seeds produce different (but equally valid) principal components, enabling ensemble diversity.
```python from pulsar._pulsar import PCA
pca = PCA(n_components=10, seed=42) X_reduced = pca.fit_transform(X) ```
- explained_variance¶
Explained variance per component.
- fit_transform(data)¶
Fit PCA and return the low-dimensional projection.
- transform(data)¶
Project new data using fitted components.
- class pulsar.PulsarConfig(data: 'str', impute: 'dict[str, ImputeSpec]', encode: 'dict[str, EncodeSpec]', drop_columns: 'list[str]', pca: 'PCASpec', ball_mapper: 'BallMapperSpec', cosmic_graph: 'CosmicGraphSpec', n_reps: 'int' = 4, run_name: 'str' = '')[source]¶
Bases:
object- ball_mapper: BallMapperSpec¶
- cosmic_graph: CosmicGraphSpec¶
- data: str¶
- drop_columns: list[str]¶
- encode: dict[str, EncodeSpec]¶
- impute: dict[str, ImputeSpec]¶
- n_reps: int = 4¶
- run_name: str = ''¶
- class pulsar.StandardScaler¶
Bases:
objectPython-facing standard scaler.
Call fit_transform first to fit the scaler and scale the training data. Then call transform on new data using the stored statistics, or inverse_transform to recover the original scale.
```python from pulsar._pulsar import StandardScaler
scaler = StandardScaler() X_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) X_recovered = scaler.inverse_transform(X_scaled) ```
- fit_transform(data)¶
Fit the scaler to data and return the scaled matrix.
Stores column means and standard deviations internally so that transform / inverse_transform can be called later.
# Parameters - data (np.ndarray[float64, 2D], shape (n_samples, n_features))
# Returns np.ndarray[float64, 2D] — scaled matrix with mean ≈ 0, std ≈ 1 per column.
- inverse_transform(data)¶
Undo scaling: x_orig = x_scaled * σ + μ.
# Raises ValueError — if fit_transform has not been called yet, or if data has a different number of columns than the fitted data.
- transform(data)¶
Scale data using statistics from fit_transform.
# Raises ValueError — if fit_transform has not been called yet, or if data has a different number of columns than the fitted data.
- class pulsar.TemporalCosmicGraph(tensor: ndarray, threshold: float = 0.0)[source]¶
Bases:
objectCosmic Graph for longitudinal time-series data.
Stores a 3D tensor W[i, j, t] of edge weights and provides methods to aggregate into summary 2D graphs.
- property T: int¶
Number of time steps.
- change_point_graph() ndarray[source]¶
Compute change-point graph: maximum absolute change between consecutive time steps.
W_change[i,j] = max_t |W[i,j,t+1] - W[i,j,t]|
Clinical meaning: Identifies sudden state transitions — acute events, medication changes, procedure effects.
- Returns:
2D array of shape (n, n) with non-negative values.
- Return type:
np.ndarray
- classmethod from_snapshots(snapshots: list[ndarray], config: PulsarConfig, threshold: float = 0.0) TemporalCosmicGraph[source]¶
Build a TemporalCosmicGraph from time-indexed data snapshots.
Runs the standard Pulsar pipeline (scale → PCA → BallMapper → pseudo-Laplacian) independently at each time step, then stacks results into a 3D tensor.
- Parameters:
snapshots (list[np.ndarray]) – List of T arrays, each of shape (n, features_t). The number of rows n must be consistent across all snapshots (same node set over time).
config (PulsarConfig) – Pulsar configuration specifying PCA dimensions, seeds, epsilon values, etc.
threshold (float) – Default threshold for binary adjacency operations.
- Returns:
Instance with 3D tensor of shape (n, n, T).
- Return type:
- mean_graph() ndarray[source]¶
Compute mean graph: average edge weight across all time steps.
W_mean[i,j] = mean_t(W[i,j,t])
Clinical meaning: Overall similarity accounting for all observations equally.
- Returns:
2D array of shape (n, n) with values in [0, 1].
- Return type:
np.ndarray
- property n: int¶
Number of nodes.
- persistence_graph(threshold: float | None = None) ndarray[source]¶
Compute persistence graph: fraction of time steps where edge exceeds threshold.
W_persist[i,j] = mean_t(W[i,j,t] > τ)
Clinical meaning: Identifies node pairs that are always similar — stable relationships that persist across the observation window.
- Parameters:
threshold (float, optional) – Edge weight threshold. Defaults to instance threshold.
- Returns:
2D array of shape (n, n) with values in [0, 1].
- Return type:
np.ndarray
- recency_graph(decay: float = 0.9) ndarray[source]¶
Compute recency-weighted graph: exponentially decayed sum favoring recent observations.
W_recent[i,j] = Σ_t λ^(T-1-t) · W[i,j,t] / Σ_t λ^(T-1-t)
where λ ∈ (0, 1) is the decay factor.
Clinical meaning: Current similarity for real-time decision support, where recent observations matter more than distant history.
- Parameters:
decay (float) – Decay factor λ in (0, 1). Values closer to 1 make the weights more uniform across time (less recency emphasis), while smaller values place more weight on the most recent steps. Default 0.9 means each step back is weighted 0.9x the previous.
- Returns:
2D array of shape (n, n) with values in [0, 1].
- Return type:
np.ndarray
- property shape: tuple[int, int, int]¶
Shape of the tensor (n, n, T).
- slice(start: int = 0, end: int | None = None) TemporalCosmicGraph[source]¶
Extract a time-range subset of the tensor.
- Parameters:
start (int) – Start time index (inclusive).
end (int, optional) – End time index (exclusive). Defaults to T.
- Returns:
New instance with sliced tensor.
- Return type:
- property tensor: ndarray¶
3D weighted adjacency tensor of shape (n, n, T).
- to_networkx(aggregation: Literal['persistence', 'mean', 'recency', 'volatility', 'trend', 'change_point'] = 'persistence', threshold: float | None = None, **kwargs) Graph[source]¶
Convert an aggregated graph to NetworkX format.
- Parameters:
aggregation (str) – Which aggregation method to use. One of: “persistence”, “mean”, “recency”, “volatility”, “trend”, “change_point”.
threshold (float, optional) – Edge weight threshold for including edges. Defaults to instance threshold. For aggregation=”persistence”, this value is also used as the persistence threshold passed to persistence_graph.
**kwargs – Additional arguments passed through to the selected aggregation method (e.g., decay=0.9 for recency_graph). Unsupported arguments raise TypeError.
- Returns:
NetworkX graph with ‘weight’ edge attributes.
- Return type:
nx.Graph
- trend_graph() ndarray[source]¶
Compute trend graph: slope of linear regression over time for each edge.
W_trend[i,j] = slope of linear fit to W[i,j,:]
Clinical meaning: Positive values indicate converging nodes (becoming more similar over time), negative values indicate diverging nodes.
- Returns:
2D array of shape (n, n). Values can be positive or negative.
- Return type:
np.ndarray
- volatility_graph() ndarray[source]¶
Compute volatility graph: temporal variance of each edge.
W_volatile[i,j] = var_t(W[i,j,t])
Clinical meaning: Identifies node pairs whose similarity is unstable — one or both may be on a trajectory (deteriorating, responding to treatment).
- Returns:
2D array of shape (n, n) with non-negative values.
- Return type:
np.ndarray
- class pulsar.ThemaRS(config: str | dict | PulsarConfig)[source]¶
Bases:
objectEnd-to-end Pulsar pipeline orchestrator.
Usage:
model = ThemaRS("params.yaml").fit() graph = model.cosmic_graph # networkx.Graph adj = model.weighted_adjacency # np.ndarray (n, n)
- property ball_maps: list[BallMapper]¶
All fitted BallMapper objects across the parameter grid.
- property cosmic_graph: Graph¶
Cosmic graph as a NetworkX graph with ‘weight’ edge attributes.
- property data: DataFrame¶
The original DataFrame passed to fit() (before preprocessing).
- fit(data: DataFrame | None = None, *, progress_callback: Callable[[str, float], None] | None = None, _precomputed_embeddings: list | None = None) ThemaRS[source]¶
Run the full pipeline: 1. Load data (if not provided) 2. Impute columns (Rust) 3. Add imputation indicator flags (Python) 4. Standard-scale (Rust) 5. PCA grid (Rust) 6. BallMapper grid (Rust, rayon-parallel) 7. Accumulate pseudo-Laplacians (Rust + numpy) 8. Build CosmicGraph (Rust)
- Parameters:
data – Input DataFrame. If None, loaded from config data path.
progress_callback – Optional
(stage: str, fraction: float) -> None. Called at the end of each pipeline stage with the stage name and cumulative progress in [0.0, 1.0]. Exceptions in the callback propagate and abort fit(). Pass None to disable (default)._precomputed_embeddings – Internal — cached PCA embeddings from a prior fit() call. Skips pca_grid() when provided.
Returns self for method chaining.
- fit_multi(datasets: list[DataFrame], *, progress_callback: Callable[[str, float], None] | None = None, store_ball_maps: bool = False, ballmap_batch_size: int | None = None, rayon_workers: int | None = None) ThemaRS[source]¶
Run the pipeline over multiple data versions (e.g. different embedding models) and fuse them via pseudo-Laplacian accumulation.
Each DataFrame must have the same number of rows (same points, different representations). The sweep is run independently on each version and all resulting ball maps are accumulated into a single CosmicGraph — so a high edge weight means two points are topological neighbours across all representations, not just one.
Imputation and column-dropping are applied per-dataset if configured. All datasets must yield the same n after preprocessing.
- Parameters:
datasets – List of DataFrames (same points, different representations).
progress_callback – Optional
(stage: str, fraction: float) -> None. Same semantics as in fit(). Stages are prefixed with dataset index (e.g. “Dataset 1/3: pca”).store_ball_maps – If True, retain fitted BallMapper objects on self. Defaults to False to lower memory; when False, BallMappers are freed after their Laplacian contributions are accumulated.
ballmap_batch_size – Optional cap on how many PCA embeddings to process per BallMapper batch. Smaller batches reduce peak RAM at the cost of more Rust crossings. None processes all embeddings together.
rayon_workers – Optional cap for Rayon worker threads used inside Rust ops (PCA grid, BallMapper grid, Laplacian accumulation). Defaults to the library setting when None.
Returns self for method chaining.
- property preprocessed_data: DataFrame¶
DataFrame after drop/impute/encode/dropna — row-aligned with graph nodes.
- property resolved_threshold: float¶
The actual threshold used (resolved from ‘auto’ or the manual value).
- select_representatives(n_reps: int | None = None) list[BallMapper][source]¶
Select n_reps diverse representative BallMapper instances by clustering them based on structural similarity (node count and coverage overlap).
Returns a list of n_reps BallMapper objects.
- property stability_result: StabilityResult | None¶
Stability analysis result (only available if threshold=’auto’).
- property weighted_adjacency: ndarray¶
n×n float64 weighted adjacency matrix.
- pulsar.accumulate_pseudo_laplacians(ball_maps, n)¶
Accumulate pseudo-Laplacians from all ball maps in parallel.
This is the optimized entry point that replaces sequential Python loops. Uses rayon parallel map-reduce for maximum throughput.
`python # Single call replaces 4000+ Python/Rust crossings galactic_L = accumulate_pseudo_laplacians(ball_maps, n) `
- pulsar.accumulate_temporal_pseudo_laplacians(ball_maps_per_time, n)¶
Accumulate pseudo-Laplacians across time steps into a 3D tensor.
This function processes ball maps from multiple time steps in parallel, producing a 3D tensor of shape (n, n, T) where each slice [:, :, t] is the accumulated pseudo-Laplacian for time step t.
# Parameters - ball_maps_per_time (list[list[BallMapper]]) — For each time step,
a list of BallMapper objects from the parameter sweep at that time.
n (int) — Number of nodes (must be consistent across all time steps).
# Returns A numpy array of shape (n, n, T) with dtype int64.
# Example ```python from pulsar._pulsar import accumulate_temporal_pseudo_laplacians
# ball_maps_per_time[t] contains all BallMappers for time step t L_tensor = accumulate_temporal_pseudo_laplacians(ball_maps_per_time, n) print(L_tensor.shape) # (n, n, T) ```
- pulsar.ball_mapper_grid(embeddings, epsilons)¶
Run Ball Mapper for every (embedding, epsilon) pair in parallel.
This is the main entry point for grid search. Parallelised across all combinations using rayon for maximum throughput on large datasets.
Complexity per fit: O(n * k) where k = number of balls. No O(n²) memory allocation - scales to large EHR datasets.
- pulsar.characterize_dataset(csv_path: str, subsample: int = 1000, seed: int = 42, *, dataframe: DataFrame | None = None) DatasetProfile[source]¶
Probes dataset geometry before fitting to return raw geometric facts.
- Parameters:
csv_path – Path to CSV file (must have >=2 numeric columns)
subsample – Max rows to analyze (for speed on large datasets)
seed – Random seed for reproducibility
dataframe – Optional pre-loaded DataFrame. When provided, csv_path is ignored for reading (but still used for logging/identification).
- Returns:
DatasetProfile containing pure empirical facts.
- Raises:
ValueError – If CSV has fewer than 2 numeric columns
FileNotFoundError – If CSV file not found
- pulsar.config_to_yaml(cfg: PulsarConfig) str[source]¶
Serialize a PulsarConfig to a reproducible YAML string.
Inverse of
load_config; every field is written explicitly so the resulting YAML can recreate the exact same pipeline run.
- pulsar.cosmic_clusters(cosmic_graph: Graph, method: str = 'agglomerative', n_clusters: int = 5) ndarray[source]¶
Run clustering on the cosmic graph adjacency matrix. Returns an (n,) int array of cluster labels.
method: “agglomerative” | “spectral”
- pulsar.cosmic_to_networkx(cg) Graph[source]¶
Convert a CosmicGraph Rust object to a NetworkX graph with ‘weight’ attributes.
- pulsar.graph_to_dataframe(ball_mapper, data: DataFrame) DataFrame[source]¶
Return a DataFrame with one row per ball node, including: node_id, size (member count), centroid coordinates, mean/std of each original feature for members in that node.
- pulsar.impute_column(values, method, seed=0)¶
Python-facing wrapper around [impute_column_inplace].
Clones the input array, fills NaN values using the chosen method, and returns a new array. The original array is not modified.
# Parameters (Python) - values (np.ndarray[float64, 1D]) — column to impute. - method (str) — one of “sample_normal”, “sample_categorical”,
“fill_mean”, “fill_median”, “fill_mode”.
seed (int, default 0) — RNG seed; only used by “sample_normal” and “sample_categorical”.
# Returns A new np.ndarray[float64, 1D] with NaN values replaced.
# Raises ValueError — if all values are NaN or the method name is unrecognised.
- pulsar.label_points(ball_mapper, n: int) ndarray[source]¶
Return an (n,) int64 array: for each data point, the ID of its first ball assignment (-1 if not covered by any ball).
- pulsar.load_config(path_or_dict: str | dict) PulsarConfig[source]¶
Load a PulsarConfig from a YAML file path or a raw dict.
- pulsar.membership_matrix(ball_mapper, n: int) ndarray[source]¶
Return a dense (n, n_balls) binary uint8 matrix. M[i, b] = 1 if point i belongs to ball b.
- pulsar.normalize_temporal_laplacian(l)¶
Normalize a 3D pseudo-Laplacian tensor into weighted adjacency matrices.
Applies the cosmic graph normalization formula independently at each time step, producing a 3D tensor of edge weights in [0, 1].
# Parameters - l (np.ndarray[int64, 3D], shape (n, n, T)) — The accumulated
pseudo-Laplacian tensor from accumulate_temporal_pseudo_laplacians.
# Returns A numpy array of shape (n, n, T) with dtype float64, where each slice [:, :, t] contains edge weights in [0, 1].
# Example ```python from pulsar._pulsar import (
accumulate_temporal_pseudo_laplacians, normalize_temporal_laplacian,
)
L_tensor = accumulate_temporal_pseudo_laplacians(ball_maps_per_time, n) W_tensor = normalize_temporal_laplacian(L_tensor) print(W_tensor.shape) # (n, n, T) print(W_tensor.min(), W_tensor.max()) # 0.0, ~1.0 ```
- pulsar.pca_grid(data, dimensions, seeds, n_oversamples=10, n_power_iter=2)¶
Compute PCA embeddings for multiple dimensions and seeds in parallel.
Optimized for grid search: computes one SVD per seed at max dimension, then slices for each requested dimension. Parallelised across seeds.
# Returns List of 2D arrays in row-major order: for each seed (outer), all dimensions (inner). So pca_grid(X, [2,3], [42,7]) returns [X_s42_d2, X_s42_d3, X_s7_d2, X_s7_d3].
- pulsar.unclustered_points(ball_mapper, n: int) list[int][source]¶
Return list of point indices not covered by any ball.
ThemaRS — orchestrates the full Pulsar pipeline.
- class pulsar.pipeline.ThemaRS(config: str | dict | PulsarConfig)[source]¶
Bases:
objectEnd-to-end Pulsar pipeline orchestrator.
Usage:
model = ThemaRS("params.yaml").fit() graph = model.cosmic_graph # networkx.Graph adj = model.weighted_adjacency # np.ndarray (n, n)
- property ball_maps: list[BallMapper]¶
All fitted BallMapper objects across the parameter grid.
- property cosmic_graph: Graph¶
Cosmic graph as a NetworkX graph with ‘weight’ edge attributes.
- property data: DataFrame¶
The original DataFrame passed to fit() (before preprocessing).
- fit(data: DataFrame | None = None, *, progress_callback: Callable[[str, float], None] | None = None, _precomputed_embeddings: list | None = None) ThemaRS[source]¶
Run the full pipeline: 1. Load data (if not provided) 2. Impute columns (Rust) 3. Add imputation indicator flags (Python) 4. Standard-scale (Rust) 5. PCA grid (Rust) 6. BallMapper grid (Rust, rayon-parallel) 7. Accumulate pseudo-Laplacians (Rust + numpy) 8. Build CosmicGraph (Rust)
- Parameters:
data – Input DataFrame. If None, loaded from config data path.
progress_callback – Optional
(stage: str, fraction: float) -> None. Called at the end of each pipeline stage with the stage name and cumulative progress in [0.0, 1.0]. Exceptions in the callback propagate and abort fit(). Pass None to disable (default)._precomputed_embeddings – Internal — cached PCA embeddings from a prior fit() call. Skips pca_grid() when provided.
Returns self for method chaining.
- fit_multi(datasets: list[DataFrame], *, progress_callback: Callable[[str, float], None] | None = None, store_ball_maps: bool = False, ballmap_batch_size: int | None = None, rayon_workers: int | None = None) ThemaRS[source]¶
Run the pipeline over multiple data versions (e.g. different embedding models) and fuse them via pseudo-Laplacian accumulation.
Each DataFrame must have the same number of rows (same points, different representations). The sweep is run independently on each version and all resulting ball maps are accumulated into a single CosmicGraph — so a high edge weight means two points are topological neighbours across all representations, not just one.
Imputation and column-dropping are applied per-dataset if configured. All datasets must yield the same n after preprocessing.
- Parameters:
datasets – List of DataFrames (same points, different representations).
progress_callback – Optional
(stage: str, fraction: float) -> None. Same semantics as in fit(). Stages are prefixed with dataset index (e.g. “Dataset 1/3: pca”).store_ball_maps – If True, retain fitted BallMapper objects on self. Defaults to False to lower memory; when False, BallMappers are freed after their Laplacian contributions are accumulated.
ballmap_batch_size – Optional cap on how many PCA embeddings to process per BallMapper batch. Smaller batches reduce peak RAM at the cost of more Rust crossings. None processes all embeddings together.
rayon_workers – Optional cap for Rayon worker threads used inside Rust ops (PCA grid, BallMapper grid, Laplacian accumulation). Defaults to the library setting when None.
Returns self for method chaining.
- property preprocessed_data: DataFrame¶
DataFrame after drop/impute/encode/dropna — row-aligned with graph nodes.
- property resolved_threshold: float¶
The actual threshold used (resolved from ‘auto’ or the manual value).
- select_representatives(n_reps: int | None = None) list[BallMapper][source]¶
Select n_reps diverse representative BallMapper instances by clustering them based on structural similarity (node count and coverage overlap).
Returns a list of n_reps BallMapper objects.
- property stability_result: StabilityResult | None¶
Stability analysis result (only available if threshold=’auto’).
- property weighted_adjacency: ndarray¶
n×n float64 weighted adjacency matrix.
Note
Configuration classes (PulsarConfig, ImputeSpec, EncodeSpec, etc.) are documented in Configuration.
Analysis¶
Analysis hooks — pure Python utilities that work on the outputs of the Rust layer.
- pulsar.analysis.hooks.cosmic_clusters(cosmic_graph: Graph, method: str = 'agglomerative', n_clusters: int = 5) ndarray[source]¶
Run clustering on the cosmic graph adjacency matrix. Returns an (n,) int array of cluster labels.
method: “agglomerative” | “spectral”
- pulsar.analysis.hooks.cosmic_to_networkx(cg) Graph[source]¶
Convert a CosmicGraph Rust object to a NetworkX graph with ‘weight’ attributes.
- pulsar.analysis.hooks.graph_to_dataframe(ball_mapper, data: DataFrame) DataFrame[source]¶
Return a DataFrame with one row per ball node, including: node_id, size (member count), centroid coordinates, mean/std of each original feature for members in that node.
- pulsar.analysis.hooks.label_points(ball_mapper, n: int) ndarray[source]¶
Return an (n,) int64 array: for each data point, the ID of its first ball assignment (-1 if not covered by any ball).
- pulsar.analysis.hooks.membership_matrix(ball_mapper, n: int) ndarray[source]¶
Return a dense (n, n_balls) binary uint8 matrix. M[i, b] = 1 if point i belongs to ball b.
- pulsar.analysis.hooks.unclustered_points(ball_mapper, n: int) list[int][source]¶
Return list of point indices not covered by any ball.
Dataset characterization for geometry-aware parameter suggestions.
Probes raw data geometry (k-NN distances, PCA variance) to provide raw facts to the agent. The agent must reason about these facts to build a configuration.
- class pulsar.analysis.characterization.ColumnProfile(name: str, dtype: str, is_numeric: bool, n_unique: int, n_missing: int, missing_pct: float, sample_values: list[str], mean: float | None, std: float | None, min_val: float | None, max_val: float | None, top_values: list[tuple[str, int]] | None)[source]¶
Bases:
objectPer-column metadata for LLM preprocessing decisions.
- dtype: str¶
- is_numeric: bool¶
- max_val: float | None¶
- mean: float | None¶
- min_val: float | None¶
- missing_pct: float¶
- n_missing: int¶
- n_unique: int¶
- name: str¶
- sample_values: list[str]¶
- std: float | None¶
- top_values: list[tuple[str, int]] | None¶
- class pulsar.analysis.characterization.DatasetProfile(n_samples: int, n_features: int, n_columns_total: int, missingness_pct: float, knn_k5_mean: float, knn_k10_mean: float, knn_k20_mean: float, pca_cumulative_variance: list[tuple[int, float]], column_profiles: list[ColumnProfile])[source]¶
Bases:
objectRaw measurements only — no derived decisions.
- column_profiles: list[ColumnProfile]¶
- knn_k10_mean: float¶
- knn_k20_mean: float¶
- knn_k5_mean: float¶
- missingness_pct: float¶
- n_columns_total: int¶
- n_features: int¶
- n_samples: int¶
- pca_cumulative_variance: list[tuple[int, float]]¶
- class pulsar.analysis.characterization.NumericProfile(knn_k5_mean: float, knn_k10_mean: float, knn_k20_mean: float, knn_p5: float, knn_p25: float, knn_p50: float, knn_p75: float, knn_p95: float, pca_cumulative_variance: list[tuple[int, float]], n_features: int, n_samples_profiled: int)[source]¶
Bases:
objectk-NN and PCA geometry of an arbitrary numeric matrix.
Shared math core used by both raw characterization and processed-space calibration. No policy decisions — pure measurement.
- knn_k10_mean: float¶
- knn_k20_mean: float¶
- knn_k5_mean: float¶
- knn_p25: float¶
- knn_p5: float¶
- knn_p50: float¶
- knn_p75: float¶
- knn_p95: float¶
- n_features: int¶
- n_samples_profiled: int¶
- pca_cumulative_variance: list[tuple[int, float]]¶
- pulsar.analysis.characterization.characterize_dataset(csv_path: str, subsample: int = 1000, seed: int = 42, *, dataframe: DataFrame | None = None) DatasetProfile[source]¶
Probes dataset geometry before fitting to return raw geometric facts.
- Parameters:
csv_path – Path to CSV file (must have >=2 numeric columns)
subsample – Max rows to analyze (for speed on large datasets)
seed – Random seed for reproducibility
dataframe – Optional pre-loaded DataFrame. When provided, csv_path is ignored for reading (but still used for logging/identification).
- Returns:
DatasetProfile containing pure empirical facts.
- Raises:
ValueError – If CSV has fewer than 2 numeric columns
FileNotFoundError – If CSV file not found
- pulsar.analysis.characterization.profile_numeric_matrix(X: ndarray, subsample: int = 1000, seed: int = 42, dims_to_probe: list[int] | None = None) NumericProfile[source]¶
Compute k-NN distances and PCA variance on an arbitrary numeric matrix.
This is the shared math core used by both
characterize_dataset()(raw space) and processed-space calibration insidecreate_config.- Parameters:
X – 2-D float64 array, already imputed (no NaN) and scaled.
subsample – Max rows to analyze.
seed – Random seed for reproducibility.
dims_to_probe – PCA dimensions to test. Defaults to
[2, 3, 5, 10, 15, 20]clipped to feature count.
- Returns:
NumericProfile with k-NN means and PCA cumulative variance.
Representations¶
TemporalCosmicGraph — Cosmic Graph analysis for longitudinal time-series data.
This module extends Pulsar to handle data where the same set of nodes (e.g., patients) are observed across multiple time steps. Instead of a single 2D weighted adjacency matrix, we work with a 3D tensor W[i, j, t] representing edge weights at each time step.
## Core Data Structure
The temporal weighted adjacency tensor has shape (n, n, T) where: - n is the number of nodes (fixed across time) - T is the number of time steps - W[i, j, t] ∈ [0, 1] is the normalized co-membership weight at time t
## Aggregation Strategies
Given the 3D tensor, we provide several methods to collapse into summary 2D graphs:
|--------|———|------------------| | persistence | mean_t(W > τ) | Stable relationships across time | | mean | mean_t(W) | Average similarity | | recency | Σ λ^(T-1-t) · W / Σ λ^(T-1-t) | Current state emphasis | | volatility | var_t(W) | Relationship instability | | trend | slope of linear fit | Converging/diverging trajectories | | change_point | max |W[t+1] - W[t]| | Sudden state transitions |
## Example Usage
```python from pulsar.representations import TemporalCosmicGraph
# Build from time-indexed snapshots tcg = TemporalCosmicGraph.from_snapshots(
snapshots=[X_t0, X_t1, X_t2, …], # List of (n, features) arrays config=config,
)
# Access raw 3D tensor tensor = tcg.tensor # shape (n, n, T)
# Compute aggregated graphs G_persist = tcg.persistence_graph(threshold=0.1) G_mean = tcg.mean_graph() G_recent = tcg.recency_graph(decay=0.9) G_volatile = tcg.volatility_graph() G_trend = tcg.trend_graph() G_change = tcg.change_point_graph()
# Convert to NetworkX G = tcg.to_networkx(aggregation=”persistence”) ```
- class pulsar.representations.temporal.TemporalCosmicGraph(tensor: ndarray, threshold: float = 0.0)[source]¶
Bases:
objectCosmic Graph for longitudinal time-series data.
Stores a 3D tensor W[i, j, t] of edge weights and provides methods to aggregate into summary 2D graphs.
- property T: int¶
Number of time steps.
- change_point_graph() ndarray[source]¶
Compute change-point graph: maximum absolute change between consecutive time steps.
W_change[i,j] = max_t |W[i,j,t+1] - W[i,j,t]|
Clinical meaning: Identifies sudden state transitions — acute events, medication changes, procedure effects.
- Returns:
2D array of shape (n, n) with non-negative values.
- Return type:
np.ndarray
- classmethod from_snapshots(snapshots: list[ndarray], config: PulsarConfig, threshold: float = 0.0) TemporalCosmicGraph[source]¶
Build a TemporalCosmicGraph from time-indexed data snapshots.
Runs the standard Pulsar pipeline (scale → PCA → BallMapper → pseudo-Laplacian) independently at each time step, then stacks results into a 3D tensor.
- Parameters:
snapshots (list[np.ndarray]) – List of T arrays, each of shape (n, features_t). The number of rows n must be consistent across all snapshots (same node set over time).
config (PulsarConfig) – Pulsar configuration specifying PCA dimensions, seeds, epsilon values, etc.
threshold (float) – Default threshold for binary adjacency operations.
- Returns:
Instance with 3D tensor of shape (n, n, T).
- Return type:
- mean_graph() ndarray[source]¶
Compute mean graph: average edge weight across all time steps.
W_mean[i,j] = mean_t(W[i,j,t])
Clinical meaning: Overall similarity accounting for all observations equally.
- Returns:
2D array of shape (n, n) with values in [0, 1].
- Return type:
np.ndarray
- property n: int¶
Number of nodes.
- persistence_graph(threshold: float | None = None) ndarray[source]¶
Compute persistence graph: fraction of time steps where edge exceeds threshold.
W_persist[i,j] = mean_t(W[i,j,t] > τ)
Clinical meaning: Identifies node pairs that are always similar — stable relationships that persist across the observation window.
- Parameters:
threshold (float, optional) – Edge weight threshold. Defaults to instance threshold.
- Returns:
2D array of shape (n, n) with values in [0, 1].
- Return type:
np.ndarray
- recency_graph(decay: float = 0.9) ndarray[source]¶
Compute recency-weighted graph: exponentially decayed sum favoring recent observations.
W_recent[i,j] = Σ_t λ^(T-1-t) · W[i,j,t] / Σ_t λ^(T-1-t)
where λ ∈ (0, 1) is the decay factor.
Clinical meaning: Current similarity for real-time decision support, where recent observations matter more than distant history.
- Parameters:
decay (float) – Decay factor λ in (0, 1). Values closer to 1 make the weights more uniform across time (less recency emphasis), while smaller values place more weight on the most recent steps. Default 0.9 means each step back is weighted 0.9x the previous.
- Returns:
2D array of shape (n, n) with values in [0, 1].
- Return type:
np.ndarray
- property shape: tuple[int, int, int]¶
Shape of the tensor (n, n, T).
- slice(start: int = 0, end: int | None = None) TemporalCosmicGraph[source]¶
Extract a time-range subset of the tensor.
- Parameters:
start (int) – Start time index (inclusive).
end (int, optional) – End time index (exclusive). Defaults to T.
- Returns:
New instance with sliced tensor.
- Return type:
- property tensor: ndarray¶
3D weighted adjacency tensor of shape (n, n, T).
- to_networkx(aggregation: Literal['persistence', 'mean', 'recency', 'volatility', 'trend', 'change_point'] = 'persistence', threshold: float | None = None, **kwargs) Graph[source]¶
Convert an aggregated graph to NetworkX format.
- Parameters:
aggregation (str) – Which aggregation method to use. One of: “persistence”, “mean”, “recency”, “volatility”, “trend”, “change_point”.
threshold (float, optional) – Edge weight threshold for including edges. Defaults to instance threshold. For aggregation=”persistence”, this value is also used as the persistence threshold passed to persistence_graph.
**kwargs – Additional arguments passed through to the selected aggregation method (e.g., decay=0.9 for recency_graph). Unsupported arguments raise TypeError.
- Returns:
NetworkX graph with ‘weight’ edge attributes.
- Return type:
nx.Graph
- trend_graph() ndarray[source]¶
Compute trend graph: slope of linear regression over time for each edge.
W_trend[i,j] = slope of linear fit to W[i,j,:]
Clinical meaning: Positive values indicate converging nodes (becoming more similar over time), negative values indicate diverging nodes.
- Returns:
2D array of shape (n, n). Values can be positive or negative.
- Return type:
np.ndarray
- volatility_graph() ndarray[source]¶
Compute volatility graph: temporal variance of each edge.
W_volatile[i,j] = var_t(W[i,j,t])
Clinical meaning: Identifies node pairs whose similarity is unstable — one or both may be on a trajectory (deteriorating, responding to treatment).
- Returns:
2D array of shape (n, n) with non-negative values.
- Return type:
np.ndarray
Runtime¶
Utility functions for Pulsar.
- pulsar.runtime.utils.build_cumulative_fractions(stages: list[tuple[str, float]]) list[tuple[str, float]][source]¶
Return [(label, cumulative_fraction), …] with final entry pinned to 1.0.
- pulsar.runtime.utils.rayon_thread_override(workers: int | None)[source]¶
Temporarily override Rayon worker count for Rust ops that respect RAYON_NUM_THREADS. Restores the previous value on exit.
Rich progress bar helpers for ThemaRS.fit() and fit_multi().
Requires the ‘rich’ package (already included in the ‘demos’ dependency group). Install with: pip install rich
- pulsar.runtime.progress.fit_multi_with_progress(model: ThemaRS, datasets: list[pd.DataFrame]) ThemaRS[source]¶
Run model.fit_multi() with a transient rich progress bar.
- Parameters:
model – Unfitted ThemaRS instance.
datasets – List of DataFrames (same points, different representations).
- Returns:
The fitted model (for method chaining).
- Raises:
ImportError – If ‘rich’ is not installed.
- pulsar.runtime.progress.fit_with_progress(model: ThemaRS, data: pd.DataFrame | None = None, **fit_kwargs) ThemaRS[source]¶
Run model.fit() with a transient rich progress bar.
The bar disappears on completion, keeping notebook output clean. Uses the model’s progress_callback mechanism — zero overhead on Rust stages.
- Parameters:
model – Unfitted ThemaRS instance.
data – Input DataFrame (optional if config specifies a data path).
**fit_kwargs – Forwarded to model.fit() (e.g. _precomputed_embeddings).
- Returns:
The fitted model (for method chaining).
- Raises:
ImportError – If ‘rich’ is not installed.
Example:
from pulsar.pipeline import ThemaRS from pulsar.runtime.progress import fit_with_progress model = fit_with_progress(ThemaRS("params.yaml")) graph = model.cosmic_graph
Utilities for hashing configuration-sensitive pipeline artifacts.
- pulsar.runtime.fingerprint.pca_fingerprint(cfg, n_rows: int, dataframe=None) str[source]¶
Compute a fingerprint for PCA configuration and data shape.
Used to detect when cached PCA embeddings can be reused (same data + PCA params). Includes data path metadata, preprocessing config, PCA config, and raw input schema so cached embeddings are only reused when the PCA input matrix is identical.
MCP Server¶
FastMCP Server for Pulsar.
Exposes “Thick Tools” for topological data analysis and interpretation.
- class pulsar.mcp.server.SweepRecord(timestamp: 'float', config_yaml: 'str', metrics: 'dict')[source]¶
Bases:
object- config_yaml: str¶
- metrics: dict¶
- timestamp: float¶
- async pulsar.mcp.server.append_dataset_chunk(upload_id: str, chunk: str, encoding: str = 'base64', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Append one chunk to a staged dataset upload. Use base64 encoding by default to avoid newline and control-character corruption.
- async pulsar.mcp.server.begin_dataset_upload(filename: str, media_type: str = 'text/csv', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Begin a staged server-side upload for a dataset that is not reachable by path. Use this for larger sandboxed uploads, then append chunks and finalize to get dataset_id.
- async pulsar.mcp.server.characterize_dataset(csv_path: str = '', dataset_id: str = '', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Probes dataset geometry to return raw facts (N, features, variance curve, k-NN mean). Prefer dataset_id after ingest. Use csv_path only for host-visible files.
- async pulsar.mcp.server.compare_clusters_tool(cluster_a: int, cluster_b: int, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Perform pairwise statistical tests between two clusters.
- async pulsar.mcp.server.compare_sweeps(run_a: str, run_b: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Compare two persisted sweep runs by config and graph metrics.
- async pulsar.mcp.server.create_config(dataset_id: str, intent: str = '', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Generate canonical Pulsar YAML for an ingested dataset_id.
Calibrates epsilon and PCA dimensions against the processed feature space (after recommended preprocessing + scaling), not raw columns.
- async pulsar.mcp.server.diagnose_cosmic_graph(ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._CurrentContext object>) str[source]¶
Diagnose the fitted cosmic graph quality by returning pure GraphMetrics. Interpret these metrics (e.g. density, component distribution) given N.
- async pulsar.mcp.server.explain_suggestion(config_yaml: str, dataset_geometry: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._CurrentContext object>) str[source]¶
Explains the mathematical reasoning behind a specific parameter suggestion based on raw geometry.
- Parameters:
config_yaml – The YAML config to explain.
dataset_geometry – JSON string of the dataset geometry summary.
- Returns:
A Markdown explanation of WHY these parameters were chosen.
- async pulsar.mcp.server.export_labeled_data(cluster_names: dict[int, str], output_path: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._CurrentContext object>) str[source]¶
Assign semantic names to clusters and export the labeled dataset to CSV.
- async pulsar.mcp.server.finalize_dataset_upload(upload_id: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Finalize a staged upload and register it as a dataset_id for downstream tools.
- async pulsar.mcp.server.generate_cluster_dossier(method: str = 'auto', max_k: int = 15, edge_weight_threshold: float = 0.0, format: str = 'json', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Generate a statistical dossier of the topological clusters.
- Parameters:
method – Clustering method (“auto”, “spectral”, “components”).
max_k – Maximum k for spectral clustering search.
edge_weight_threshold – Drop edges with weight <= this value before clustering. Edge weights are the fraction of ball maps that placed two points together. Use weight percentiles from diagnose_cosmic_graph to choose a value (e.g. weight_p50 to keep only the stronger half of edges).
format – Response format. “json” (structured fields only, default), “markdown” (human-readable summary only), or “full” (both).
- async pulsar.mcp.server.get_experiment_history(ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._CurrentContext object>) str[source]¶
Returns a markdown table of all topological sweeps run in the current session. Use this to reason about your trajectory across multiple iterations.
- Returns:
Markdown table of history. Returns an empty table if no sweeps have been run.
- async pulsar.mcp.server.get_runtime_context(ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Return the MCP server runtime context so agents can reason about path visibility and handle lifecycle before attempting file-based operations.
- async pulsar.mcp.server.get_threshold_stability_curve(ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._CurrentContext object>) str[source]¶
Return the full component-count-vs-edge-weight-threshold curve.
Uses H0 persistent homology on the cosmic graph’s weighted adjacency to show how many connected components exist at each edge weight threshold. Use this to reason about alternative clustering thresholds after the initial auto-clustering.
- Returns:
JSON with thresholds, component_counts, top plateaus, and the auto-selected threshold (midpoint of longest valid plateau).
- async pulsar.mcp.server.get_topological_skeleton(run_id: str = '', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Return structured graph connectivity for the latest run or an explicit run_id.
- async pulsar.mcp.server.ingest_dataset(path: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Register a host-visible absolute dataset path and return a stable dataset_id handle. Use this only when the MCP server can read the path directly.
- async pulsar.mcp.server.ingest_dataset_base64(filename: str, content_base64: str, media_type: str = 'text/csv', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Persist a small or medium uploaded dataset sent as base64 and return dataset_id. Prefer this over raw text content for one-shot uploads. Use staged upload for larger files.
- async pulsar.mcp.server.ingest_dataset_content(filename: str, content: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Persist uploaded or sandbox-local dataset content into the MCP server cache and return a stable dataset_id handle. This is a legacy text-only fallback. Prefer ingest_dataset_base64 for one-shot uploads and staged upload for larger files.
- async pulsar.mcp.server.recommend_preprocessing(dataset_geometry: str = '', dataset_id: str = '', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Analyze column profiles and return preprocessing recommendations. Prefer dataset_id after ingest; accepts dataset_geometry as fallback.
- Parameters:
dataset_geometry – The raw JSON string from characterize_dataset.
dataset_id – Preferred dataset handle. When provided, characterizes the dataset automatically (dataset_geometry is ignored).
- Returns:
JSON with preprocessing_yaml, per-column rationale, and expansion estimate.
- async pulsar.mcp.server.refine_config(config_yaml: str, overrides: dict[str, Any]) str[source]¶
Apply constrained overrides to canonical Pulsar YAML and return normalized YAML.
- async pulsar.mcp.server.repair_preprocessing_config(error_message: str, config_yaml: str, dataset_geometry: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._CurrentContext object>) str[source]¶
Given a preprocessing error from run_topological_sweep, produce a corrected config_yaml with a change log of what was fixed and why.
Handles: NaN remaining, non-numeric columns, coercion failure, all-missing columns, and cardinality violations.
- Parameters:
error_message – The full error text from the failed sweep.
config_yaml – The config_yaml that caused the error.
dataset_geometry – The raw JSON string from characterize_dataset.
- Returns:
Markdown with error classification, change log table, and patched config_yaml.
- async pulsar.mcp.server.run_topological_sweep(config_path: str = '', config_yaml: str = '', dataset_id: str = '', save_config: bool = False, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Run the Pulsar topological sweep pipeline on a dataset.
Returns a markdown diff of parameter and metric changes compared to your previous run, followed by the full execution summary.
- Parameters:
config_path – Path to a params.yaml file on disk.
config_yaml – Inline YAML string (preferred).
dataset_id – Preferred dataset handle when data has already been ingested.
save_config – If True, persist the resolved config YAML to disk.
- async pulsar.mcp.server.suggest_initial_config(dataset_geometry: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._CurrentContext object>) str[source]¶
Generate an initial configuration YAML based on the raw dataset geometry. Deprecated: prefer create_config(dataset_id) for processed-space calibration.
- Parameters:
dataset_geometry – The raw JSON string from characterize_dataset.
- Returns:
JSON with config_yaml and calibration provenance.
- async pulsar.mcp.server.validate_config(config_yaml: str, dataset_id: str = '', ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._OptionalCurrentContext object>) str[source]¶
Validate full Pulsar config shape and normalize it into canonical YAML. Prefer dataset_id once data has been ingested.
- async pulsar.mcp.server.validate_preprocessing_config(config_yaml: str, ctx: ~fastmcp.server.context.Context = <fastmcp.server.dependencies._CurrentContext object>) str[source]¶
Dry-run the preprocessing stage only against session data — no PCA, no BallMapper, no sweep cost. Use this to confirm a config is valid before run_topological_sweep.
Requires a prior run_topological_sweep call (to populate session data).
- Parameters:
config_yaml – Inline YAML config string to validate.
- Returns:
PASS with schema summary, or a structured error matching repair_preprocessing_config input format.