Source code for mamut.preprocessing.preprocessing

import warnings
from typing import List, Literal, Optional

import numpy as np
import pandas as pd
from sklearn.pipeline import Pipeline

from mamut.preprocessing.handlers import (
    handle_categorical,
    handle_extraction,
    handle_imbalanced,
    handle_missing_categorical,
    handle_missing_numeric,
    handle_outliers,
    handle_scaling,
    handle_selection,
    handle_skewed,
)

warnings.filterwarnings(
    "ignore",
    category=DeprecationWarning,
    module="numpy\\.ma\\.extras",
)
warnings.filterwarnings(
    "ignore",
    category=DeprecationWarning,
    message="Conversion of an array with ndim > 0 to a scalar is deprecated.*",
)


[docs] class Preprocessor: """ A class used to preprocess data for machine learning models. Attributes ---------- numeric_features : Optional[List[str]] List of numeric feature names. categorical_features : Optional[List[str]] List of categorical feature names. num_imputation : Literal["iterative", "knn", "mean", "median", "constant"] Method for numeric imputation. cat_imputation : Literal["most_frequent", "constant"] Method for categorical imputation. scaling : Literal["standard", "robust"] Method for scaling numeric features. feature_selection : bool Whether to perform feature selection. pca : bool Whether to perform PCA for feature extraction. outlier_removal : bool Whether to remove rows flagged by IsolationForest during preprocessing. imbalanced_resampling : bool Whether to perform resampling to handle imbalanced data. resampling_strategy : Literal["SMOTE", "undersample", "combine"] Strategy for resampling imbalanced data. skew_threshold : float Threshold for skewness correction. pca_threshold : float Threshold for PCA feature extraction. selection_threshold : float Threshold for feature selection. imbalance_threshold : float Threshold for detecting imbalanced data. random_state : Optional[int] Random state for reproducibility. Methods ------- fit_transform(X: pd.DataFrame, y: pd.Series) -> (np.ndarray, np.ndarray, Pipeline) Fits the preprocessor and transforms the data. transform(X: pd.DataFrame) -> np.ndarray Transforms the data using the fitted preprocessor. report() -> dict Returns a report of the preprocessing steps. _check_fitted() Checks if the preprocessor has been fitted. """ def __init__( self, numeric_features: Optional[List[str]] = None, categorical_features: Optional[List[str]] = None, num_imputation: Literal[ "iterative", "knn", "mean", "median", "constant" ] = "knn", cat_imputation: Literal["most_frequent", "constant"] = "most_frequent", scaling: Literal["standard", "robust"] = "standard", feature_selection: bool = False, pca: bool = False, profile: Literal[ "generic_ohe", "tree_ohe", "native_categorical" ] = "generic_ohe", outlier_removal: bool = False, imbalanced_resampling: bool = True, resampling_strategy: Literal["SMOTE", "undersample", "combine"] = "SMOTE", skew_threshold: float = 1, pca_threshold: float = 0.95, selection_threshold: float = 0.05, imbalance_threshold: float = 0.10, random_state: Optional[int] = 42, ) -> None: """ Constructs all the necessary attributes for the Preprocessor object. Parameters ---------- numeric_features : Optional[List[str]] List of numeric feature names. categorical_features : Optional[List[str]] List of categorical feature names. num_imputation : Literal["iterative", "knn", "mean", "median", "constant"] Method for numeric imputation. cat_imputation : Literal["most_frequent", "constant"] Method for categorical imputation. scaling : Literal["standard", "robust"] Method for scaling numeric features. feature_selection : bool Whether to perform feature selection. pca : bool Whether to perform PCA for feature extraction. profile : Literal["generic_ohe", "tree_ohe", "native_categorical"] Preprocessing family. ``generic_ohe`` keeps the legacy one-hot, skew-correction, and scaling path. ``tree_ohe`` one-hot encodes categoricals but skips numeric scaling/skew transforms. ``native_categorical`` preserves pandas categorical columns for estimators that support them. outlier_removal : bool Whether to remove rows flagged by IsolationForest. Disabled by default because automatic row removal can change the target distribution and hurt external validity. imbalanced_resampling : bool Whether to perform resampling to handle imbalanced data. resampling_strategy : Literal["SMOTE", "undersample", "combine"] Strategy for resampling imbalanced data. skew_threshold : float Threshold for skewness correction. pca_threshold : float Threshold for PCA feature extraction. selection_threshold : float Threshold for feature selection. imbalance_threshold : float Threshold for detecting imbalanced data. random_state : Optional[int] Random state for reproducibility. """ self._numeric_features_config = ( list(numeric_features) if numeric_features is not None else None ) self._categorical_features_config = ( list(categorical_features) if categorical_features is not None else None ) self.numeric_features = None self.categorical_features = None self.num_imputation = num_imputation self.cat_imputation = cat_imputation if profile not in {"generic_ohe", "tree_ohe", "native_categorical"}: raise ValueError( "profile must be one of: 'generic_ohe', 'tree_ohe', " "'native_categorical'." ) if profile == "native_categorical" and (feature_selection or pca): raise ValueError( "feature_selection and pca are not compatible with " "native_categorical preprocessing." ) self.feature_selection = feature_selection self.pca = pca self.profile = profile self.outlier_removal = outlier_removal self.random_state = random_state self.scaling = scaling self.pca_threshold = pca_threshold self.selection_threshold = selection_threshold self.imbalance_threshold = imbalance_threshold self.imbalanced_resampling = imbalanced_resampling self.resampling_strategy = resampling_strategy self.skew_threshold = skew_threshold self.imbalanced_ = None self.missing_ = None self.imbalanced_trans_ = None self.outlier_trans_ = None self.missing_num_trans_ = None self.missing_cat_trans_ = None self.cat_trans_ = None self.skew_trans_ = None self.skewed_ = None self.scaler_ = None self.sel_trans_ = None self.ext_trans_ = None self.fitted = False self.skewed_feature_names_ = None self.selected_features_ = None self.pca_loadings_ = None self.missing_numeric_ = None self.missing_categorical_ = None self.has_numeric_ = None self.has_categorical_ = None self.ohe_feature_names_ = None self.report_ = None self.n_missing_numeric = None self.n_missing_categorical = None self.lambdas_ = None self.feature_importances_ = None self.original_feature_names_ = None self.feature_names_out_ = None self._reset_fit_state() @property def _uses_native_categorical(self) -> bool: return self.profile == "native_categorical" @property def _encodes_categorical(self) -> bool: return not self._uses_native_categorical @property def _transforms_numeric_shape(self) -> bool: return self.profile == "generic_ohe" def _reset_fit_state(self) -> None: self.numeric_features = ( list(self._numeric_features_config) if self._numeric_features_config is not None else None ) self.categorical_features = ( list(self._categorical_features_config) if self._categorical_features_config is not None else None ) self.imbalanced_ = False self.missing_ = False self.imbalanced_trans_ = None self.outlier_trans_ = None self.missing_num_trans_ = None self.missing_cat_trans_ = None self.cat_trans_ = None self.skew_trans_ = None self.skewed_ = False self.scaler_ = None self.sel_trans_ = None self.ext_trans_ = None self.fitted = False self.skewed_feature_names_ = [] self.selected_features_ = None self.pca_loadings_ = None self.missing_numeric_ = False self.missing_categorical_ = False self.has_numeric_ = False self.has_categorical_ = False self.ohe_feature_names_ = [] self.report_ = None self.n_missing_numeric = 0 self.n_missing_categorical = 0 self.lambdas_ = [] self.feature_importances_ = None self.original_feature_names_ = None self.feature_names_out_ = None def _validate_feature_columns(self, X: pd.DataFrame) -> None: configured_features = [] for feature_group in (self.numeric_features, self.categorical_features): if feature_group is not None: configured_features.extend(feature_group) missing_features = sorted(set(configured_features) - set(X.columns)) if missing_features: raise ValueError( "Configured preprocessing features are not present in X: " f"{missing_features}." ) if self.numeric_features is not None and self.categorical_features is not None: overlap = sorted( set(self.numeric_features) & set(self.categorical_features) ) if overlap: raise ValueError( "Features cannot be both numeric and categorical: " f"{overlap}." )
[docs] def fit_transform( self, X: pd.DataFrame, y: pd.Series ) -> (np.ndarray, np.ndarray, Pipeline): """ Fits the preprocessor and transforms the data. Parameters ---------- X : pd.DataFrame The input features. y : pd.Series The target variable. Returns ------- np.ndarray Transformed features. np.ndarray Transformed target variable. Pipeline The fitted pipeline. """ self._reset_fit_state() if not isinstance(X, pd.DataFrame): raise ValueError("Input data must be a pandas DataFrame.") self._validate_feature_columns(X) self.report_ = dict() self.original_feature_names_ = X.columns.tolist() if self.numeric_features is None: self.numeric_features = X.select_dtypes(include="number").columns.tolist() if self.categorical_features is None: self.categorical_features = X.select_dtypes( exclude="number" ).columns.tolist() self.report_["features"] = { "numeric": self.numeric_features, "categorical": self.categorical_features, } if y.value_counts(normalize=True).min() < self.imbalance_threshold: self.imbalanced_ = True X, y = X.copy(), y.copy() self.has_numeric_ = len(self.numeric_features) > 0 self.has_categorical_ = len(self.categorical_features) > 0 if self.has_numeric_: self.n_missing_numeric = X[self.numeric_features].isnull().sum().sum() if self.n_missing_numeric > 0: self.missing_numeric_ = True if not self._uses_native_categorical: X, self.missing_num_trans_ = handle_missing_numeric( X, self.numeric_features, self.num_imputation ) if self.has_categorical_: self.n_missing_categorical = ( X[self.categorical_features].isnull().sum().sum() ) if self.n_missing_categorical > 0: self.missing_categorical_ = True cat_imputation = ( "constant" if self._uses_native_categorical else self.cat_imputation ) X, self.missing_cat_trans_ = handle_missing_categorical( X, self.categorical_features, cat_imputation ) self.missing_ = self.missing_numeric_ or self.missing_categorical_ if self.missing_: self.report_["imputation"] = {} if self.missing_numeric_: if self.missing_num_trans_ is None: self.report_["imputation"]["numeric"] = { "policy": "preserved_for_native_estimator", "n_missing_numeric": self.n_missing_numeric, } else: self.report_["imputation"]["numeric"] = { "transformer": self.missing_num_trans_.__class__.__name__, "n_missing_numeric": self.n_missing_numeric, } if self.missing_categorical_: self.report_["imputation"]["categorical"] = { "transformer": self.missing_cat_trans_.__class__.__name__, "n_missing_categorical": self.n_missing_categorical, } if self.has_numeric_ and self.outlier_removal: n_row_before = X.shape[0] X, y, self.outlier_trans_ = handle_outliers( X, y, self.numeric_features, random_state=self.random_state ) n_row_after = X.shape[0] self.report_["removing_outliers"] = { "transformer": self.outlier_trans_.__class__.__name__, "n_outliers_removed": n_row_before - n_row_after, } if self.has_categorical_ and self._encodes_categorical: X, self.cat_trans_, self.ohe_feature_names_ = handle_categorical( X, self.categorical_features ) self.ohe_feature_names_ = list(self.ohe_feature_names_) self.report_["category_encoding"] = { "transformer": self.cat_trans_.__class__.__name__, "encoded_feature_names": self.ohe_feature_names_, } elif self.has_categorical_: X = self._coerce_native_categorical(X) self.report_["category_encoding"] = { "transformer": "NativeCategoricalDtype", "categorical_feature_names": self.categorical_features, } if self._uses_native_categorical and self.has_numeric_: X = self._coerce_native_numeric(X) if self.has_numeric_ and self._transforms_numeric_shape: ( X, self.skew_trans_, self.skewed_feature_names_, self.lambdas_, ) = handle_skewed(X, self.numeric_features, threshold=self.skew_threshold) self.report_["skew_transform"] = { "transformer": self.skew_trans_.__class__.__name__, "method": self.skew_trans_.method, "skewed_feature_names": self.skewed_feature_names_, "lambdas": self.lambdas_, } else: self.skewed_feature_names_ = [] self.lambdas_ = [] if self.has_numeric_ and self._transforms_numeric_shape: X, self.scaler_ = handle_scaling(X, self.numeric_features, self.scaling) self.report_["scaling"] = { "transformer": self.scaler_.__class__.__name__, } if self.feature_selection: X, self.sel_trans_, self.selected_features_, self.feature_importances_ = ( handle_selection( X, y, threshold=self.selection_threshold, random_state=self.random_state, ) ) self.report_["feature_selection"] = { "transformer": self.sel_trans_.__class__.__name__, "estimator": self.sel_trans_.estimator_.__class__.__name__, "selected_features": self.selected_features_, "feature_importances": self.feature_importances_, } if self.pca: n_features_before = X.shape[1] X, self.ext_trans_, self.pca_loadings_ = handle_extraction( X, threshold=self.pca_threshold, random_state=self.random_state ) n_features_after = X.shape[1] self.report_["feature_extraction"] = { "transformer": self.ext_trans_.__class__.__name__, "pca_loadings": self.pca_loadings_, "n_features_before": n_features_before, "n_features_after": n_features_after, } if all( [ self.imbalanced_resampling, self.imbalanced_, not self._uses_native_categorical, ] ): n_row_before = X.shape[0] X, y, self.imbalanced_trans_ = handle_imbalanced( X, y, self.resampling_strategy, random_state=self.random_state ) if self.imbalanced_trans_ is not None: n_row_after = X.shape[0] self.report_["imbalanced_resampling"] = { "transformer": self.imbalanced_trans_.__class__.__name__, "n_resampled": n_row_after - n_row_before, } else: self.report_["imbalanced_resampling"] = { "skipped": True, "reason": "Insufficient minority samples for SMOTE.", "strategy": self.resampling_strategy, } elif self.imbalanced_resampling and self.imbalanced_: self.report_["imbalanced_resampling"] = { "skipped": True, "reason": "Native categorical preprocessing preserves raw feature types.", "strategy": self.resampling_strategy, } self.skewed_ = len(self.skewed_feature_names_) > 0 if self.pca: self.feature_names_out_ = [ f"PC{i + 1}" for i in range(self.pca_loadings_.shape[0]) ] elif isinstance(X, pd.DataFrame): self.feature_names_out_ = X.columns.tolist() else: self.feature_names_out_ = [ f"feature_{i}" for i in range(np.asarray(X).shape[1]) ] self.fitted = True if isinstance(X, pd.DataFrame) and not self._uses_native_categorical: X = X.values if isinstance(y, pd.Series): y = y.values return X, y
[docs] def transform(self, X: pd.DataFrame) -> np.ndarray: """ Transforms the data using the fitted preprocessor. Parameters ---------- X : pd.DataFrame The input features. Returns ------- np.ndarray Transformed features. """ self._check_fitted() if not isinstance(X, pd.DataFrame): raise ValueError("Input data must be a pandas DataFrame.") X = X.copy() if self.missing_num_trans_ is not None: X[self.numeric_features] = self.missing_num_trans_.transform( X[self.numeric_features] ) if self.missing_cat_trans_ is not None: X[self.categorical_features] = self.missing_cat_trans_.transform( X[self.categorical_features] ) if self.has_categorical_ and self._encodes_categorical: encoded_features = self.cat_trans_.transform(X[self.categorical_features]) encoded_features_df = pd.DataFrame( encoded_features, columns=self.ohe_feature_names_, index=X.index, ) X = X.drop(columns=self.categorical_features).join(encoded_features_df) elif self.has_categorical_: X = self._coerce_native_categorical(X) if self._uses_native_categorical and self.has_numeric_: X = self._coerce_native_numeric(X) if self.skewed_: X[self.skewed_feature_names_] = self.skew_trans_.transform( X[self.skewed_feature_names_] ) if self.has_numeric_ and self.scaler_ is not None: X[self.numeric_features] = self.scaler_.transform(X[self.numeric_features]) if self.feature_selection: index = X.index selected = self.sel_trans_.transform(X) X = pd.DataFrame(selected, columns=self.selected_features_, index=index) if self.pca: X = self.ext_trans_.transform(X) if isinstance(X, pd.DataFrame) and not self._uses_native_categorical: X = X.values return X
def _coerce_native_categorical(self, X: pd.DataFrame) -> pd.DataFrame: X = X.copy() for feature in self.categorical_features: X[feature] = ( X[feature].astype("string").fillna("__missing__").astype("category") ) return X def _coerce_native_numeric(self, X: pd.DataFrame) -> pd.DataFrame: X = X.copy() for feature in self.numeric_features: X[feature] = pd.to_numeric(X[feature], errors="coerce") return X
[docs] def report(self): """ Returns a report of the preprocessing steps. Returns ------- dict A dictionary containing the report of the preprocessing steps. """ self._check_fitted() return self.report_
[docs] def _check_fitted(self): """ Checks if the preprocessor has been fitted. Raises ------ RuntimeError If the preprocessor has not been fitted. """ if not self.fitted: raise RuntimeError("Preprocessor has not been fitted.")