Source code for trailed.plugins.sklearn.channels

"""
ECT transformer with channel support for categorical features.

This module provides a transformer that computes separate ECTs for each
categorical channel in the point cloud.
"""

from typing import Literal, Optional

import numpy as np
from numpy.typing import ArrayLike, NDArray

import trailed_rust
from trailed.sampling import generate_directions as _generate_directions_func


[docs] class EctChannelTransformer: """ECT transformer with channel support for categorical features. This transformer computes separate ECTs for each categorical channel in the point cloud, useful for molecules with different atom types or other categorically-labeled point clouds. Parameters ---------- num_thetas : int, default=64 Number of directions to sample. resolution : int, default=64 Number of threshold steps. radius : float, default=1.0 Radius of the threshold interval. scale : float, default=500.0 Scale factor for sigmoid approximation. max_channels : int or None, default=None Maximum number of channels. If None, inferred from data. sampling_method : str, default="uniform" Method for generating directions. flatten : bool, default=True If True, flatten the ECT to a 1D feature vector. normalized : bool, default=False If True, normalize each ECT to [0, 1]. seed : int, default=42 Random seed for direction generation. Examples -------- >>> from trailed.plugins.sklearn import EctChannelTransformer >>> import numpy as np >>> # Point clouds with channel labels >>> X = np.random.randn(10, 50, 3).astype(np.float32) >>> channels = np.random.randint(0, 3, size=(10, 50)) # 3 channels >>> transformer = EctChannelTransformer(max_channels=3) >>> features = transformer.fit_transform(X, channels=channels) """ def __init__( self, num_thetas: int = 64, resolution: int = 64, radius: float = 1.0, scale: float = 500.0, max_channels: Optional[int] = None, sampling_method: Literal[ "uniform", "structured_2d", "multiview", "spherical_grid" ] = "uniform", flatten: bool = True, normalized: bool = False, seed: int = 42, ): self.num_thetas = num_thetas self.resolution = resolution self.radius = radius self.scale = scale self.max_channels = max_channels self.sampling_method = sampling_method self.flatten = flatten self.normalized = normalized self.seed = seed self.directions_: Optional[NDArray] = None self.ambient_dim_: Optional[int] = None self._lin: Optional[NDArray] = None self.n_channels_: Optional[int] = None def _generate_directions(self, ambient_dim: int) -> NDArray: """Generate direction vectors.""" return _generate_directions_func( self.num_thetas, ambient_dim, self.sampling_method, self.seed )
[docs] def fit( self, X: ArrayLike, y=None, channels: Optional[ArrayLike] = None, ) -> "EctChannelTransformer": """Fit the transformer. Parameters ---------- X : array-like of shape (n_samples, n_points, n_dims) Training point clouds. y : None Ignored. channels : array-like of shape (n_samples, n_points), optional Channel indices for each point. """ X = np.asarray(X, dtype=np.float32) if X.ndim != 3: raise ValueError( f"Expected 3D array (n_samples, n_points, n_dims), got {X.ndim}D" ) self.ambient_dim_ = X.shape[2] self.directions_ = self._generate_directions(self.ambient_dim_) self._lin = trailed_rust.generate_lin(self.radius, self.resolution) if self.max_channels is not None: self.n_channels_ = self.max_channels elif channels is not None: channels = np.asarray(channels, dtype=np.int64) self.n_channels_ = int(np.max(channels)) + 1 else: self.n_channels_ = 1 return self
[docs] def transform( self, X: ArrayLike, channels: Optional[ArrayLike] = None, ) -> NDArray: """Transform point clouds to ECT features. Parameters ---------- X : array-like of shape (n_samples, n_points, n_dims) Point clouds to transform. channels : array-like of shape (n_samples, n_points) Channel indices for each point. Returns ------- features : ndarray ECT features with shape depending on flatten parameter. """ if self.directions_ is None: raise RuntimeError("Transformer not fitted. Call fit() first.") X = np.asarray(X, dtype=np.float32) if X.ndim != 3: raise ValueError( f"Expected 3D array (n_samples, n_points, n_dims), got {X.ndim}D" ) n_samples = X.shape[0] n_points = X.shape[1] if channels is None: channels = np.zeros((n_samples, n_points), dtype=np.int64) else: channels = np.asarray(channels, dtype=np.int64) results = [] for i in range(n_samples): points = X[i] ch = channels[i] nh = points @ self.directions_ batch = np.zeros(n_points, dtype=np.int64) ect = trailed_rust.compute_ect_channels_forward( nh, batch, ch, self._lin, 1, self.n_channels_, self.scale ) ect = ect[0] # Remove batch dimension if self.normalized: ect = ect / (np.max(ect) + 1e-8) results.append(ect) ects = np.stack(results, axis=0) if self.flatten: return ects.reshape(n_samples, -1) return ects
[docs] def fit_transform( self, X: ArrayLike, y=None, channels: Optional[ArrayLike] = None, ) -> NDArray: """Fit and transform in one step.""" return self.fit(X, y, channels=channels).transform(X, channels=channels)
[docs] def get_params(self, deep: bool = True) -> dict: """Get parameters for this estimator.""" return { "num_thetas": self.num_thetas, "resolution": self.resolution, "radius": self.radius, "scale": self.scale, "max_channels": self.max_channels, "sampling_method": self.sampling_method, "flatten": self.flatten, "normalized": self.normalized, "seed": self.seed, }
[docs] def set_params(self, **params) -> "EctChannelTransformer": """Set parameters for this estimator.""" for key, value in params.items(): if not hasattr(self, key): raise ValueError(f"Invalid parameter: {key}") setattr(self, key, value) self.directions_ = None self.ambient_dim_ = None self._lin = None self.n_channels_ = None return self