Source code for mamut.model_selection

import ast
import inspect
import logging
import time
import warnings
from copy import copy
from dataclasses import dataclass
from typing import Callable, List, Literal, Optional

import numpy as np
import optuna
import pandas as pd
from catboost import CatBoostClassifier  # noqa
from lightgbm import LGBMClassifier  # noqa
from optuna.samplers import RandomSampler, TPESampler
from scipy import stats
from sklearn.base import BaseEstimator, ClassifierMixin, clone
from sklearn.ensemble import (  # noqa
    ExtraTreesClassifier,
    HistGradientBoostingClassifier,
    RandomForestClassifier,
    StackingClassifier,
    VotingClassifier,
)
from sklearn.exceptions import ConvergenceWarning
from sklearn.linear_model import LogisticRegression  # noqa
from sklearn.metrics import (
    accuracy_score,
    balanced_accuracy_score,
    f1_score,
    jaccard_score,
    precision_score,
    recall_score,
    roc_auc_score,
)
from sklearn.model_selection import StratifiedGroupKFold, StratifiedKFold
from sklearn.naive_bayes import GaussianNB  # noqa
from sklearn.neighbors import KNeighborsClassifier  # noqa
from sklearn.neural_network import MLPClassifier  # noqa
from sklearn.svm import SVC  # noqa
from xgboost import XGBClassifier  # noqa

from mamut.preprocessing.preprocessing import Preprocessor
from mamut.utils.utils import (
    SEARCH_PROFILES,
    adjust_search_spaces,
    model_names_for_profile,
    model_param_dict,
    sample_parameter,
)

optuna.logging.set_verbosity(optuna.logging.WARNING)
warnings.filterwarnings("ignore", category=ConvergenceWarning)

log = logging.getLogger(__name__)


@dataclass(frozen=True)
class CandidateModel:
    name: str
    estimator_factory: Callable[[Optional[int], Optional[int]], object]
    runtime_cost: Literal["low", "medium", "high"] = "medium"
    supports_predict_proba: bool = True
    preprocessing_profile: Literal["generic_ohe", "tree_ohe", "native_categorical"] = (
        "generic_ohe"
    )

    def create(self, *, random_state: Optional[int], n_jobs: Optional[int]):
        return self.estimator_factory(random_state, n_jobs)


def _sklearn_factory(model_class, **default_params):
    def factory(random_state: Optional[int], n_jobs: Optional[int]):
        params = default_params.copy()
        valid_params = model_class().get_params()
        if random_state is not None and "random_state" in valid_params:
            params.setdefault("random_state", random_state)
        if n_jobs is not None and "n_jobs" in valid_params:
            params.setdefault("n_jobs", n_jobs)
        return model_class(**params)

    return factory


def _xgboost_factory(random_state: Optional[int], n_jobs: Optional[int]):
    params = {
        "eval_metric": "logloss",
        "verbosity": 0,
    }
    if random_state is not None:
        params["random_state"] = random_state
    if n_jobs is not None:
        params["n_jobs"] = n_jobs
    return XGBClassifier(**params)


def _lightgbm_factory(random_state: Optional[int], n_jobs: Optional[int]):
    params = {
        "verbosity": -1,
    }
    if random_state is not None:
        params["random_state"] = random_state
    if n_jobs is not None:
        params["n_jobs"] = n_jobs
    return LGBMClassifier(**params)


def _catboost_factory(random_state: Optional[int], n_jobs: Optional[int]):
    params = {
        "allow_writing_files": False,
        "verbose": False,
    }
    if random_state is not None:
        params["random_seed"] = random_state
    if n_jobs is not None:
        params["thread_count"] = n_jobs
    return CatBoostClassifier(**params)


MODEL_REGISTRY = {
    "LogisticRegression": CandidateModel(
        "LogisticRegression", _sklearn_factory(LogisticRegression), runtime_cost="low"
    ),
    "RandomForestClassifier": CandidateModel(
        "RandomForestClassifier",
        _sklearn_factory(RandomForestClassifier),
        runtime_cost="medium",
        preprocessing_profile="tree_ohe",
    ),
    "ExtraTreesClassifier": CandidateModel(
        "ExtraTreesClassifier",
        _sklearn_factory(ExtraTreesClassifier),
        runtime_cost="medium",
        preprocessing_profile="tree_ohe",
    ),
    "HistGradientBoostingClassifier": CandidateModel(
        "HistGradientBoostingClassifier",
        _sklearn_factory(HistGradientBoostingClassifier),
        runtime_cost="medium",
        preprocessing_profile="tree_ohe",
    ),
    "SVC": CandidateModel("SVC", _sklearn_factory(SVC), runtime_cost="high"),
    "XGBClassifier": CandidateModel(
        "XGBClassifier",
        _xgboost_factory,
        runtime_cost="high",
        preprocessing_profile="tree_ohe",
    ),
    "LGBMClassifier": CandidateModel(
        "LGBMClassifier",
        _lightgbm_factory,
        runtime_cost="medium",
        preprocessing_profile="native_categorical",
    ),
    "CatBoostClassifier": CandidateModel(
        "CatBoostClassifier",
        _catboost_factory,
        runtime_cost="high",
        preprocessing_profile="native_categorical",
    ),
    "MLPClassifier": CandidateModel(
        "MLPClassifier", _sklearn_factory(MLPClassifier), runtime_cost="high"
    ),
    "GaussianNB": CandidateModel(
        "GaussianNB", _sklearn_factory(GaussianNB), runtime_cost="low"
    ),
    "KNeighborsClassifier": CandidateModel(
        "KNeighborsClassifier",
        _sklearn_factory(KNeighborsClassifier),
        runtime_cost="medium",
    ),
}


def available_model_names() -> tuple[str, ...]:
    return tuple(MODEL_REGISTRY)


def preprocessing_profile_for_model(
    model_name: str,
) -> Literal["generic_ohe", "tree_ohe", "native_categorical"]:
    return MODEL_REGISTRY[model_name].preprocessing_profile


def categorical_feature_names(X) -> list:
    if not isinstance(X, pd.DataFrame):
        return []
    return [
        column
        for column in X.columns
        if any(
            [
                isinstance(X[column].dtype, pd.CategoricalDtype),
                pd.api.types.is_object_dtype(X[column]),
                pd.api.types.is_string_dtype(X[column]),
                pd.api.types.is_bool_dtype(X[column]),
            ]
        )
    ]


def fit_estimator(estimator, X, y):
    y = np.asarray(y).ravel()
    if isinstance(estimator, CatBoostClassifier) and isinstance(X, pd.DataFrame):
        cat_features = categorical_feature_names(X)
        if cat_features:
            estimator.fit(X, y, cat_features=cat_features)
            return estimator
    estimator.fit(X, y)
    return estimator


def make_preprocessor(preprocessor_factory: Optional[Callable], model_name: str):
    if preprocessor_factory is None:
        return None
    signature = inspect.signature(preprocessor_factory)
    if not signature.parameters:
        return preprocessor_factory()
    return preprocessor_factory(model_name)


class CandidatePipelineClassifier(ClassifierMixin, BaseEstimator):
    """Raw-input classifier that owns model-specific preprocessing."""

    def __init__(
        self,
        estimator,
        preprocess: bool = True,
        profile: str = "generic_ohe",
        preprocessor_kwargs: Optional[dict] = None,
    ):
        self.estimator = estimator
        self.preprocess = preprocess
        self.profile = profile
        self.preprocessor_kwargs = preprocessor_kwargs

    def fit(self, X, y):
        kwargs = dict(self.preprocessor_kwargs or {})
        kwargs["profile"] = self.profile
        self.preprocessor_ = Preprocessor(**kwargs) if self.preprocess else None
        if self.preprocessor_ is not None:
            X_fit, y_fit = self.preprocessor_.fit_transform(
                pd.DataFrame(X).copy(), pd.Series(y).copy()
            )
        else:
            X_fit, y_fit = X, np.asarray(y)
        self.estimator_ = clone(self.estimator)
        fit_estimator(self.estimator_, X_fit, y_fit)
        self.classes_ = self.estimator_.classes_
        return self

    def _transform(self, X):
        if self.preprocessor_ is None:
            return X
        return self.preprocessor_.transform(pd.DataFrame(X).copy())

    def predict(self, X):
        return self.estimator_.predict(self._transform(X))

    def predict_proba(self, X):
        return self.estimator_.predict_proba(self._transform(X))


[docs] class ModelSelector: def __init__( self, X_train, y_train, X_validation, y_validation, score_metric: Callable, X_train_raw: Optional[pd.DataFrame] = None, y_train_raw: Optional[pd.Series] = None, X_validation_raw: Optional[pd.DataFrame] = None, y_validation_raw: Optional[pd.Series] = None, groups_train_raw: Optional[pd.Series] = None, preprocessor_factory: Optional[Callable] = None, exclude_models: Optional[List[str]] = None, include_models: Optional[List[str]] = None, search_profile: Literal["quick", "balanced", "thorough"] = "balanced", optimization_method: Literal["random_search", "bayes"] = "bayes", n_iterations: int = 50, random_state: Optional[int] = 42, n_jobs: Optional[int] = 1, verbose: bool = False, ): self.X_train = X_train self.y_train = np.asarray(y_train) self.X_validation = X_validation self.y_validation = np.asarray(y_validation) self.X_train_raw = X_train_raw self.y_train_raw = pd.Series(y_train_raw) if y_train_raw is not None else None self.X_validation_raw = X_validation_raw self.y_validation_raw = ( pd.Series(y_validation_raw) if y_validation_raw is not None else None ) self.groups_train_raw = ( pd.Series(groups_train_raw) if groups_train_raw is not None else None ) self.preprocessor_factory = preprocessor_factory self.base_score_metric = score_metric self.search_profile = search_profile self.optimization_method = optimization_method self.n_jobs = n_jobs self.model_names = self._resolve_model_names( include_models=include_models, exclude_models=exclude_models, search_profile=search_profile, ) self.models = [ self._instantiate_model( model_name, random_state=random_state, n_jobs=n_jobs ) for model_name in self.model_names ] self.n_classes_ = len(np.unique(y_train)) self.binary = True if self.n_classes_ == 2 else False self.score_metric_name = copy(score_metric.__name__) self.roc = self.score_metric_name == "roc_auc_score" self.score_metric = lambda y_true, y_pred: self._compute_score( score_metric, y_true, y_pred ) self.optuna_sampler = ( TPESampler(seed=random_state) if optimization_method == "bayes" else RandomSampler(seed=random_state) ) self.n_iterations = n_iterations self.verbose = verbose self.random_state = random_state self.SKF_ = self._make_splitter( self._cv_y(), groups=self.groups_train_raw, requested_splits=5 ) @staticmethod def _resolve_model_names( *, include_models: Optional[List[str]], exclude_models: Optional[List[str]], search_profile: str, ) -> list[str]: if search_profile not in SEARCH_PROFILES: raise ValueError( "search_profile must be one of: 'quick', 'balanced', 'thorough'." ) if include_models and exclude_models: raise ValueError("Use include_models or exclude_models, not both.") if include_models: candidate_names = list(include_models) elif exclude_models: candidate_names = list(available_model_names()) else: candidate_names = list(model_names_for_profile(search_profile)) unknown_models = sorted(set(candidate_names) - set(available_model_names())) if unknown_models: valid_models = ", ".join(available_model_names()) raise ValueError( f"Selected model names are unsupported: {unknown_models}. " f"Valid model names are: {valid_models}." ) if exclude_models: candidate_names = [ model_name for model_name in candidate_names if model_name not in set(exclude_models) ] if not candidate_names: raise ValueError("At least one supported model must be selected.") return candidate_names @staticmethod def _instantiate_model( model_name: str, random_state: Optional[int], n_jobs: Optional[int] ): return MODEL_REGISTRY[model_name].create( random_state=random_state, n_jobs=n_jobs, ) @staticmethod def _safe_index(data, idx): if hasattr(data, "iloc"): return data.iloc[idx] return data[idx] @staticmethod def _effective_cv_splits(y, requested_splits: int) -> int: class_counts = pd.Series(y).value_counts() if class_counts.empty: raise ValueError("Cannot build stratified folds without target values.") effective_splits = min(requested_splits, int(class_counts.min())) if effective_splits < 2: raise ValueError( "At least two samples are required in every class for model selection." ) return effective_splits def _cv_y(self): return self.y_train_raw if self.y_train_raw is not None else self.y_train def _make_splitter( self, y, groups=None, requested_splits: int = 5, repeat: int = 0 ): effective_splits = self._effective_cv_splits(y, requested_splits) seed = None if self.random_state is None else self.random_state + repeat if groups is not None: effective_splits = min(effective_splits, pd.Series(groups).nunique()) if effective_splits < 2: raise ValueError("At least two groups are required for grouped CV.") return StratifiedGroupKFold( n_splits=effective_splits, shuffle=True, random_state=seed ) return StratifiedKFold( n_splits=effective_splits, shuffle=True, random_state=seed ) def _compute_score(self, score_metric, y_true, y_pred): y_true = np.asarray(y_true) y_pred = np.asarray(y_pred) if self.roc: if self.binary: return score_metric(y_true, y_pred) return score_metric(y_true, y_pred, multi_class="ovr", average="weighted") if self.score_metric_name in { "precision_score", "recall_score", "f1_score", "jaccard_score", }: return score_metric( y_true, y_pred, average="weighted", zero_division=0, ) return score_metric(y_true, y_pred) def objective(self, trial, model): model_name = model.__class__.__name__ if model_name in model_param_dict: param_grid = model_param_dict[model_name] else: raise ValueError(f"Model {model_name} not supported") param = { param_name: sample_parameter(trial, param_name, value) for param_name, value in param_grid.items() } param = adjust_search_spaces(param, model) configured_model = clone(model).set_params(**param) cv_scores = [] X_for_split = self.X_train_raw if self.X_train_raw is not None else self.X_train y_for_split = self._cv_y() for train_idx, val_idx in self.SKF_.split( X_for_split, y_for_split, self.groups_train_raw ): X_train_fold, y_train_fold, X_val_fold, y_val_fold = self._prepare_cv_fold( train_idx, val_idx, model_name ) fold_model = clone(configured_model) fit_estimator(fold_model, X_train_fold, y_train_fold) val_pred = ( fold_model.predict_proba(X_val_fold) if self.roc else fold_model.predict(X_val_fold) ) if self.binary and self.roc: val_pred = val_pred[:, 1] cv_scores.append(self.score_metric(y_val_fold, val_pred)) mean_cv_score = np.mean(cv_scores) return mean_cv_score def _prepare_cv_fold(self, train_idx, val_idx, model_name: str): if self.X_train_raw is None or self.preprocessor_factory is None: X_train_fold = self._safe_index(self.X_train, train_idx) X_val_fold = self._safe_index(self.X_train, val_idx) y_train_fold = self.y_train[train_idx] y_val_fold = self.y_train[val_idx] return X_train_fold, y_train_fold, X_val_fold, y_val_fold X_train_fold_raw = self.X_train_raw.iloc[train_idx] X_val_fold_raw = self.X_train_raw.iloc[val_idx] y_train_fold_raw = self.y_train_raw.iloc[train_idx] y_val_fold = self.y_train_raw.iloc[val_idx].to_numpy() preprocessor = make_preprocessor(self.preprocessor_factory, model_name) if preprocessor is None: return ( self._as_model_input(X_train_fold_raw), y_train_fold_raw.to_numpy(), self._as_model_input(X_val_fold_raw), y_val_fold, ) X_train_fold, y_train_fold = preprocessor.fit_transform( X_train_fold_raw.copy(), y_train_fold_raw.copy() ) X_val_fold = preprocessor.transform(X_val_fold_raw.copy()) return X_train_fold, np.asarray(y_train_fold), X_val_fold, y_val_fold def optimize_model(self, model): study = optuna.create_study(direction="maximize", sampler=self.optuna_sampler) start_time = time.time() study.optimize( lambda trial: self.objective(trial, model), n_trials=self.n_iterations, show_progress_bar=self.verbose, ) end_time = time.time() duration = end_time - start_time best_params = study.best_params best_params = adjust_search_spaces(best_params, model) hidden_sizes = best_params.get("hidden_layer_sizes") if isinstance(hidden_sizes, str): try: best_params["hidden_layer_sizes"] = ast.literal_eval(hidden_sizes) except (ValueError, SyntaxError): pass return best_params, study.best_value, duration, study def tune_and_fit_model(self, model_name: str): model = self._instantiate_model( model_name, random_state=self.random_state, n_jobs=self.n_jobs ) params, score, duration, study = self.optimize_model(model) fitted_model = clone(model).set_params(**params) preprocessor = make_preprocessor(self.preprocessor_factory, model_name) if preprocessor is None: X_train = self._as_model_input(self.X_train_raw) y_train = np.asarray(self.y_train_raw) else: X_train, y_train = preprocessor.fit_transform( self.X_train_raw.copy(), self.y_train_raw.copy() ) fit_estimator(fitted_model, X_train, np.asarray(y_train)) return fitted_model, preprocessor, score, duration, study def nested_cv_selection_scores( self, *, X: pd.DataFrame, y: pd.Series, groups: Optional[pd.Series], cv_splits: int, cv_repeats: int, confidence_level: float, ) -> pd.DataFrame: rows = {model_name: [] for model_name in self.model_names} durations = {model_name: 0.0 for model_name in self.model_names} y = pd.Series(y).reset_index(drop=True) X = pd.DataFrame(X).reset_index(drop=True) groups = ( pd.Series(groups).reset_index(drop=True) if groups is not None else None ) for repeat in range(cv_repeats): splitter = self._make_splitter( y, groups=groups, requested_splits=cv_splits, repeat=repeat ) for train_idx, validation_idx in splitter.split(X, y, groups): X_train = X.iloc[train_idx] y_train = y.iloc[train_idx] X_validation = X.iloc[validation_idx] y_validation = y.iloc[validation_idx] groups_train = groups.iloc[train_idx] if groups is not None else None for model_name in self.model_names: start_time = time.time() selector = ModelSelector( X_train.to_numpy(), y_train.to_numpy(), X_validation.to_numpy(), y_validation.to_numpy(), score_metric=self.base_score_metric, X_train_raw=X_train, y_train_raw=y_train, X_validation_raw=X_validation, y_validation_raw=y_validation, groups_train_raw=groups_train, preprocessor_factory=self.preprocessor_factory, include_models=[model_name], search_profile=self.search_profile, optimization_method=self.optimization_method, n_iterations=self.n_iterations, random_state=( None if self.random_state is None else self.random_state + repeat ), n_jobs=self.n_jobs, verbose=self.verbose, ) result = selector.compare_models() validation_summary = result[-2] rows[model_name].append( float(validation_summary.iloc[0][self.score_metric_name]) ) durations[model_name] += time.time() - start_time summaries = [] for model_name, scores in rows.items(): summaries.append( { "model": model_name, "metric": self.score_metric_name, **_summarize_scores(scores, confidence_level), "selection_duration": durations[model_name], "status": "ok", } ) return pd.DataFrame(summaries).sort_values( by=["mean_score", "std_score", "selection_duration"], ascending=[False, True, True], na_position="last", ) def compare_models(self): best_model = None score_for_best_model = -np.inf params_for_best_model = None fitted_models = {} fitted_preprocessors = {} fitted_training_data = {} fitted_validation_data = {} validation_summary = pd.DataFrame() studies = {} for model in self.models: log.info("Optimizing model: %s", model.__class__.__name__) model_name = model.__class__.__name__ params, score, duration, study = self.optimize_model(model) log.info( "Best parameters for %s: %s, score: %.4f %s", model_name, params, score, self.score_metric_name, ) model = clone(model).set_params(**params) ( X_train, y_train, X_validation, y_validation, preprocessor, ) = self._prepare_final_model_data(model_name) fit_estimator(model, X_train, y_train) fitted_models[model_name] = model fitted_preprocessors[model_name] = preprocessor fitted_training_data[model_name] = (X_train, y_train) fitted_validation_data[model_name] = (X_validation, y_validation) studies[model_name] = study if self.roc: if self.binary: score_on_validation = self.score_metric( y_validation, model.predict_proba(X_validation)[:, 1], ) else: score_on_validation = self.score_metric( y_validation, model.predict_proba(X_validation), ) else: score_on_validation = self.score_metric( y_validation, model.predict(X_validation) ) if score_on_validation > score_for_best_model: score_for_best_model = score_on_validation best_model = model params_for_best_model = params scores_on_validation = self._score_model_with_metrics( model, X_validation, y_validation ) validation_summary = pd.concat( [ validation_summary, pd.DataFrame( [ { "model": model_name, **scores_on_validation, "duration": duration, } ] ), ], ignore_index=True, ) log.info( "Found best model: %s with parameters %s and score %.4f %s.", best_model.__class__.__name__, params_for_best_model, score_for_best_model, self.score_metric_name, ) return ( best_model, params_for_best_model, score_for_best_model, fitted_models, fitted_preprocessors, fitted_training_data, fitted_validation_data, validation_summary, studies, ) def _prepare_final_model_data(self, model_name: str): if any( [ self.X_train_raw is None, self.X_validation_raw is None, self.preprocessor_factory is None, ] ): return ( self.X_train, self.y_train, self.X_validation, self.y_validation, None, ) preprocessor = make_preprocessor(self.preprocessor_factory, model_name) y_train_raw = self.y_train_raw.copy() y_validation_raw = self.y_validation_raw.to_numpy() if preprocessor is None: return ( self._as_model_input(self.X_train_raw), y_train_raw.to_numpy(), self._as_model_input(self.X_validation_raw), y_validation_raw, None, ) X_train, y_train = preprocessor.fit_transform( self.X_train_raw.copy(), y_train_raw.copy() ) X_validation = preprocessor.transform(self.X_validation_raw.copy()) return ( X_train, np.asarray(y_train), X_validation, y_validation_raw, preprocessor, ) def _score_model_with_metrics( self, fitted_model, X_validation=None, y_validation=None ): X_validation = self.X_validation if X_validation is None else X_validation y_validation = ( self.y_validation if y_validation is None else np.asarray(y_validation) ) if not hasattr(fitted_model, "predict"): raise ValueError( "The model is not fitted and can not be scored with any metric." ) y_pred = fitted_model.predict(X_validation) y_pred_proba = fitted_model.predict_proba(X_validation) if self.binary: y_pred_proba = y_pred_proba[:, 1] results = { "accuracy_score": accuracy_score(y_validation, y_pred), "balanced_accuracy_score": balanced_accuracy_score(y_validation, y_pred), "precision_score": precision_score( y_validation, y_pred, average="weighted", zero_division=0 ), "recall_score": recall_score( y_validation, y_pred, average="weighted", zero_division=0 ), "f1_score": f1_score( y_validation, y_pred, average="weighted", zero_division=0 ), "jaccard_score": jaccard_score( y_validation, y_pred, average="weighted", zero_division=0 ), "roc_auc_score": roc_auc_score( y_validation, y_pred_proba, multi_class="ovr", average="weighted", ), } results = { self.score_metric_name: results.pop(self.score_metric_name), **results, } return results @staticmethod def _as_model_input(data): if isinstance(data, pd.DataFrame): return data.to_numpy() return data
def repeated_cv_selection_scores( *, fitted_models: dict, X: pd.DataFrame, y: pd.Series, metric_name: str, binary: bool, score_metric: Callable, preprocessor_factory: Optional[Callable], cv_splits: int, cv_repeats: int, confidence_level: float, random_state: Optional[int], ) -> pd.DataFrame: y = pd.Series(y) effective_splits = ModelSelector._effective_cv_splits(y, cv_splits) rows = [] for model_name, estimator in fitted_models.items(): scores = [] start_time = time.time() status = "ok" for repeat in range(cv_repeats): splitter = StratifiedKFold( n_splits=effective_splits, shuffle=True, random_state=None if random_state is None else random_state + repeat, ) for train_idx, val_idx in splitter.split(X, y): try: X_train_fold, y_train_fold, X_val_fold, y_val_fold = ( _prepare_raw_cv_fold( X=X, y=y, model_name=model_name, train_idx=train_idx, val_idx=val_idx, preprocessor_factory=preprocessor_factory, ) ) model = clone(estimator) fit_estimator(model, X_train_fold, y_train_fold) if metric_name == "roc_auc_score": predictions = model.predict_proba(X_val_fold) if binary: predictions = predictions[:, 1] else: predictions = model.predict(X_val_fold) scores.append( _compute_metric( score_metric, metric_name, binary, y_val_fold, predictions ) ) except ( Exception ) as exc: # pragma: no cover - data-dependent user edge cases status = f"failed: {exc.__class__.__name__}" scores = [] break if status != "ok": break score_summary = _summarize_scores(scores, confidence_level) rows.append( { "model": model_name, "metric": metric_name, **score_summary, "selection_duration": time.time() - start_time, "status": status, } ) return pd.DataFrame(rows).sort_values( by=["mean_score", "std_score", "selection_duration"], ascending=[False, True, True], na_position="last", ) def select_from_repeated_cv_summary( summary: pd.DataFrame, practical_margin: float ) -> str: return repeated_cv_selection_decision(summary, practical_margin)["model"] def repeated_cv_selection_decision( summary: pd.DataFrame, practical_margin: float ) -> dict: valid_summary = summary.loc[summary["status"].eq("ok")].dropna( subset=["mean_score"] ) if valid_summary.empty: raise ValueError("No successful repeated-CV selection scores are available.") best_mean = float(valid_summary["mean_score"].max()) contenders = valid_summary.loc[ valid_summary["mean_score"] >= best_mean - practical_margin ].copy() contenders = contenders.sort_values( by=["std_score", "selection_duration", "model"], ascending=[True, True, True], na_position="last", ) selected = contenders.iloc[0] status = "close_call" if len(contenders) > 1 else "confirmed" return { "model": str(selected["model"]), "status": status, "best_mean_score": best_mean, "n_contenders": int(len(contenders)), } def _prepare_raw_cv_fold( *, X: pd.DataFrame, y: pd.Series, model_name: str, train_idx, val_idx, preprocessor_factory: Optional[Callable], ): X_train_fold_raw = X.iloc[train_idx] X_val_fold_raw = X.iloc[val_idx] y_train_fold_raw = y.iloc[train_idx] y_val_fold = y.iloc[val_idx].to_numpy() if preprocessor_factory is None: return ( ModelSelector._as_model_input(X_train_fold_raw), y_train_fold_raw.to_numpy(), ModelSelector._as_model_input(X_val_fold_raw), y_val_fold, ) preprocessor = make_preprocessor(preprocessor_factory, model_name) if preprocessor is None: return ( ModelSelector._as_model_input(X_train_fold_raw), y_train_fold_raw.to_numpy(), ModelSelector._as_model_input(X_val_fold_raw), y_val_fold, ) X_train_fold, y_train_fold = preprocessor.fit_transform( X_train_fold_raw.copy(), y_train_fold_raw.copy() ) X_val_fold = preprocessor.transform(X_val_fold_raw.copy()) return X_train_fold, np.asarray(y_train_fold), X_val_fold, y_val_fold def _compute_metric(score_metric, metric_name: str, binary: bool, y_true, predictions): return score_metric(y_true, predictions) def _summarize_scores(scores, confidence_level: float) -> dict: score_array = pd.Series(scores, dtype="float64").dropna().to_numpy() n_scores = len(score_array) if n_scores == 0: return { "mean_score": np.nan, "std_score": np.nan, "ci_low": np.nan, "ci_high": np.nan, "n_scores": 0, } mean_score = float(np.mean(score_array)) std_score = float(np.std(score_array, ddof=1)) if n_scores > 1 else 0.0 if n_scores > 1: critical_value = stats.t.ppf((1 + confidence_level) / 2, df=n_scores - 1) margin = critical_value * std_score / np.sqrt(n_scores) ci_low = max(0.0, mean_score - margin) ci_high = min(1.0, mean_score + margin) else: ci_low = mean_score ci_high = mean_score return { "mean_score": mean_score, "std_score": std_score, "ci_low": ci_low, "ci_high": ci_high, "n_scores": n_scores, }