import logging
import os
import time
import warnings
from typing import List, Literal, Optional
import joblib
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, ClassifierMixin, clone
from sklearn.ensemble import VotingClassifier
from sklearn.metrics import (
accuracy_score,
balanced_accuracy_score,
f1_score,
jaccard_score,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.model_selection import GroupShuffleSplit, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder
from mamut.evidence import build_evidence_report
from mamut.preprocessing.preprocessing import Preprocessor
from mamut.utils.utils import metric_dict
from .evaluation import ModelEvaluator
from .model_selection import (
CandidatePipelineClassifier,
ModelSelector,
available_model_names,
fit_estimator,
preprocessing_profile_for_model,
repeated_cv_selection_decision,
)
log = logging.getLogger(__name__)
class LabelDecodedClassifier(ClassifierMixin, BaseEstimator):
"""Wrap an encoded-label classifier so public predictions use original labels."""
def __init__(self, estimator, label_encoder):
self.estimator = estimator
self.label_encoder = label_encoder
def fit(self, X, y):
y_encoded = self.label_encoder.transform(y)
fit_estimator(self.estimator, X, y_encoded)
return self
def predict(self, X):
encoded_predictions = self.estimator.predict(X)
return self.label_encoder.inverse_transform(
np.asarray(encoded_predictions).astype(int).ravel()
)
def predict_proba(self, X):
return self.estimator.predict_proba(X)
@property
def classes_(self):
return self.label_encoder.classes_
[docs]
class Mamut:
"""
A class used to manage the machine learning pipeline, including preprocessing, model selection, and evaluation.
Attributes
----------
preprocess : bool
Whether to apply preprocessing to the data.
imb_threshold : float
Threshold for detecting imbalanced data.
exclude_models : Optional[List[str]]
List of models to exclude from selection.
score_metric : callable
Metric used to evaluate model performance.
optimization_method : Literal["random_search", "bayes"]
Method for hyperparameter optimization.
n_iterations : Optional[int]
Number of iterations for optimization.
random_state : Optional[int]
Random state for reproducibility.
preprocessor : Preprocessor
Preprocessor object for data preprocessing.
le : LabelEncoder
Label encoder for target variable.
model_selector : ModelSelector
Object for model selection.
X : pd.DataFrame
Input features.
y : pd.Series
Target variable.
X_train : pd.DataFrame
Training features.
X_validation : pd.DataFrame
Validation features used for model selection.
y_train : pd.Series
Training target variable.
y_validation : pd.Series
Validation target variable used for model selection.
X_holdout : Optional[pd.DataFrame]
Optional final holdout features used only for final evaluation.
y_holdout : Optional[pd.Series]
Optional final holdout target used only for final evaluation.
raw_fitted_models_ : Optional[List[Pipeline]]
List of raw fitted models.
fitted_models_ : Optional[List[Pipeline]]
List of fitted models with preprocessing.
best_model_ : Optional[Pipeline]
Best model pipeline.
best_score_ : float
Best model score.
training_summary_ : dict
Summary of the training process.
optuna_studies_ : dict
Optuna studies for hyperparameter optimization.
ensemble_ : Optional[Pipeline]
Ensemble model pipeline.
greedy_ensemble_ : Optional[Pipeline]
Greedy ensemble model pipeline.
ensemble_models_ : Optional[List[Pipeline]]
List of models in the ensemble.
imbalanced_ : bool
Whether the data is imbalanced.
Methods
-------
fit(X: pd.DataFrame, y: pd.Series) -> Pipeline
Fits the model to the data.
predict(X: pd.DataFrame) -> np.ndarray
Predicts the target variable for the given data.
predict_proba(X: pd.DataFrame) -> np.ndarray
Predicts the probabilities of the target variable for the given data.
evaluate() -> None
Evaluates the fitted models.
save_best_model(path: str) -> None
Saves the best model to the specified path.
create_ensemble(voting: Literal["soft", "hard"] = "soft") -> Pipeline
Creates an ensemble of the fitted models.
create_greedy_ensemble(n_models: int = 6, voting: Literal["soft", "hard"] = "soft") -> Pipeline
Creates a greedy ensemble of the fitted models.
"""
def __init__(
self,
preprocess: bool = True,
imb_threshold: float = 0.10,
exclude_models: Optional[List[str]] = None,
include_models: Optional[List[str]] = None,
score_metric: Literal[
"accuracy",
"precision",
"recall",
"f1",
"balanced_accuracy",
"jaccard",
"roc_auc_score",
] = "f1",
search_profile: Literal["quick", "balanced", "thorough"] = "balanced",
optimization_method: Literal["random_search", "bayes"] = "bayes",
n_iterations: int = 30,
random_state: Optional[int] = 42,
n_jobs: Optional[int] = 1,
selection_strategy: Literal[
"single_split", "nested_cv", "repeated_cv"
] = "single_split",
selection_cv_splits: int = 5,
selection_cv_repeats: int = 2,
selection_practical_margin: float = 0.005,
preprocessing_profile: Literal["auto", "generic_ohe"] = "auto",
validation_size: float = 0.2,
holdout_size: Optional[float] = None,
save_models: bool = False,
models_output_dir: str = "fitted_models",
refit_final_model: bool = False,
verbose: bool = False,
evidence_cv_splits: int = 5,
evidence_cv_repeats: int = 3,
evidence_confidence_level: float = 0.95,
evidence_practical_margin: float = 0.01,
**preprocessor_kwargs,
):
"""
Constructs all the necessary attributes for the Mamut object.
Parameters
----------
preprocess : bool
Whether to apply preprocessing to the data.
imb_threshold : float
Threshold for detecting imbalanced data.
exclude_models : Optional[List[str]]
List of models to exclude from selection.
include_models : Optional[List[str]]
Optional exact list of models to include. Mutually exclusive with
exclude_models.
score_metric : Literal["accuracy", "precision", "recall", "f1", "balanced_accuracy", "jaccard", "roc_auc_score"]
Metric used to evaluate model performance.
search_profile : Literal["quick", "balanced", "thorough"]
Candidate set to search when include_models is not supplied.
optimization_method : Literal["random_search", "bayes"]
Method for hyperparameter optimization.
n_iterations : Optional[int]
Number of iterations for optimization.
random_state : Optional[int]
Random state for reproducibility.
n_jobs : Optional[int]
Number of worker threads for supported estimators. Use None to
keep estimator defaults.
selection_strategy : Literal["single_split", "nested_cv", "repeated_cv"]
Whether to select the final candidate by one validation split or
by nested CV over non-holdout modeling data. ``repeated_cv`` is a
deprecated alias for ``nested_cv``.
selection_cv_splits : int
Number of stratified folds for repeated-CV selection.
selection_cv_repeats : int
Number of repeats for repeated-CV selection.
selection_practical_margin : float
Maximum mean-score difference treated as a practical tie before
preferring lower score variance and faster runtime.
preprocessing_profile : Literal["auto", "generic_ohe"]
``auto`` lets each candidate use a model-aware preprocessing
profile. ``generic_ohe`` preserves the legacy shared one-hot
preprocessing behavior.
validation_size : float
Fraction of the modeling data reserved for model selection.
holdout_size : Optional[float]
Optional fraction of the original data reserved for final evaluation.
Holdout data is never used for model or ensemble selection.
save_models : bool
Whether to save fitted candidate models during fit.
models_output_dir : str
Directory for fitted model artifacts when save_models=True.
evidence_cv_splits : int
Number of stratified folds used in evidence score stability checks.
evidence_cv_repeats : int
Number of repeats used in evidence score stability checks.
evidence_confidence_level : float
Confidence level used for evidence score intervals.
evidence_practical_margin : float
Minimum metric difference required before evidence challenges the
validation-selected model.
**preprocessor_kwargs
Additional keyword arguments for the Preprocessor.
"""
if score_metric not in metric_dict:
valid_metrics = ", ".join(sorted(metric_dict))
raise ValueError(f"score_metric must be one of: {valid_metrics}.")
if optimization_method not in {"random_search", "bayes"}:
raise ValueError(
"optimization_method must be one of: 'random_search', 'bayes'."
)
if search_profile not in {"quick", "balanced", "thorough"}:
raise ValueError(
"search_profile must be one of: 'quick', 'balanced', 'thorough'."
)
if not isinstance(n_iterations, int) or n_iterations < 1:
raise ValueError(
"n_iterations must be an integer greater than or equal to 1."
)
if n_jobs is not None and (
not isinstance(n_jobs, int) or n_jobs == 0 or n_jobs < -1
):
raise ValueError("n_jobs must be None, -1, or a positive integer.")
if selection_strategy not in {"single_split", "nested_cv", "repeated_cv"}:
raise ValueError(
"selection_strategy must be one of: 'single_split', 'nested_cv', "
"'repeated_cv'."
)
if selection_cv_splits < 2:
raise ValueError("selection_cv_splits must be at least 2.")
if selection_cv_repeats < 1:
raise ValueError("selection_cv_repeats must be at least 1.")
if selection_practical_margin < 0:
raise ValueError("selection_practical_margin must be non-negative.")
if preprocessing_profile not in {"auto", "generic_ohe"}:
raise ValueError(
"preprocessing_profile must be one of: 'auto', 'generic_ohe'."
)
self._validate_split_size(validation_size, "validation_size")
if holdout_size is not None:
self._validate_split_size(holdout_size, "holdout_size")
if evidence_cv_splits < 2:
raise ValueError("evidence_cv_splits must be at least 2.")
if evidence_cv_repeats < 1:
raise ValueError("evidence_cv_repeats must be at least 1.")
if not 0 < evidence_confidence_level < 1:
raise ValueError(
"evidence_confidence_level must be greater than 0 and less than 1."
)
if evidence_practical_margin < 0:
raise ValueError("evidence_practical_margin must be non-negative.")
if include_models and exclude_models:
raise ValueError("Use include_models or exclude_models, not both.")
known_models = set(available_model_names())
exclude_models = list(exclude_models or [])
include_models = list(include_models) if include_models is not None else None
unknown_models = sorted(set(exclude_models) - known_models)
if unknown_models:
valid_models = ", ".join(available_model_names())
raise ValueError(
f"exclude_models contains unsupported model names: {unknown_models}. "
f"Valid model names are: {valid_models}."
)
if include_models is not None:
unknown_models = sorted(set(include_models) - known_models)
if unknown_models:
valid_models = ", ".join(available_model_names())
raise ValueError(
f"include_models contains unsupported model names: {unknown_models}. "
f"Valid model names are: {valid_models}."
)
self.preprocess = preprocess
self.imb_threshold = imb_threshold
self.exclude_models = exclude_models
self.include_models = include_models
self.score_metric = metric_dict[score_metric]
self.score_metric_name = self.score_metric.__name__
self.search_profile = search_profile
self.optimization_method = optimization_method
self.n_iterations = n_iterations
self.random_state = random_state
self.n_jobs = n_jobs
self.selection_strategy = selection_strategy
self.selection_cv_splits = selection_cv_splits
self.selection_cv_repeats = selection_cv_repeats
self.selection_practical_margin = selection_practical_margin
self.preprocessing_profile = preprocessing_profile
self.validation_size = validation_size
self.holdout_size = holdout_size
self.save_models = save_models
self.models_output_dir = models_output_dir
self.refit_final_model = refit_final_model
self.verbose = verbose
self.evidence_cv_splits = evidence_cv_splits
self.evidence_cv_repeats = evidence_cv_repeats
self.evidence_confidence_level = evidence_confidence_level
self.evidence_practical_margin = evidence_practical_margin
self.preprocessor_kwargs = preprocessor_kwargs.copy()
self.preprocessor_kwargs.setdefault("imbalance_threshold", imb_threshold)
self.imb_threshold = self.preprocessor_kwargs["imbalance_threshold"]
self.preprocessor = (
Preprocessor(**self.preprocessor_kwargs) if preprocess else None
)
self.le = LabelEncoder()
self.model_selector = None
self.X = None
self.y = None
self.y_encoded_ = None
self.X_train = None
self.X_validation = None
self.X_holdout = None
self.y_train = None
self.y_validation = None
self.y_holdout = None
self.X_modeling_raw_ = None
self.y_modeling_raw_ = None
self.y_modeling_original_ = None
self.X_train_raw_ = None
self.X_validation_raw_ = None
self.y_train_raw_ = None
self.y_validation_raw_ = None
self.X_holdout_raw_ = None
self.y_holdout_original_ = None
self.groups_ = None
self.groups_modeling_ = None
self.groups_train_ = None
self.groups_validation_ = None
self.groups_holdout_ = None
self.X_test = None
self.y_test = None
self.binary = None
self.roc = None
self.raw_fitted_models_ = None
self.candidate_pipelines_ = None
self.fitted_preprocessors_ = None
self.fitted_training_data_ = None
self.fitted_validation_data_ = None
self.fitted_models_ = None
self.selected_estimator_ = None
self.validation_selected_estimator_ = None
self.final_preprocessor_ = None
self.final_estimator_ = None
self.best_model_ = None
self.best_score_ = None
self.best_validation_score_ = None
self.holdout_score_ = None
self.validation_summary_ = None
self.holdout_summary_ = None
self.training_summary_ = None
self.selection_summary_ = None
self.optuna_studies_ = None
self.models_output_path_ = None
self.evidence_report_ = None
self.validation_integrity_ = None
self.leakage_checks_ = None
self.baseline_comparison_ = None
self.score_stability_ = None
self.selection_guidance_ = None
self.selection_summary_ = None
self.report_result_ = None
self.ensemble_ = None
self.greedy_ensemble_ = None
self.ensemble_models_ = None
self.imbalanced_ = None
[docs]
def fit(
self,
X: pd.DataFrame,
y: pd.Series,
X_holdout: Optional[pd.DataFrame] = None,
y_holdout: Optional[pd.Series] = None,
groups: Optional[pd.Series] = None,
groups_holdout: Optional[pd.Series] = None,
):
"""
Fits the model to the data.
Parameters
----------
X : pd.DataFrame
The input features.
y : pd.Series
The target variable.
X_holdout : Optional[pd.DataFrame]
Optional final holdout features. If provided, y_holdout must also
be provided and holdout_size must be None.
y_holdout : Optional[pd.Series]
Optional final holdout target. Holdout rows are never used for
model or ensemble selection.
groups : Optional[pd.Series]
Group labels for observations that must remain in the same fold.
groups_holdout : Optional[pd.Series]
Group labels for explicit holdout rows. Required with ``groups``
and explicit holdout data so overlap can be rejected.
Returns
-------
Pipeline
The best model pipeline.
"""
if (X_holdout is None) != (y_holdout is None):
raise ValueError("X_holdout and y_holdout must be provided together.")
if X_holdout is not None and self.holdout_size is not None:
raise ValueError(
"Use either holdout_size or explicit X_holdout/y_holdout, not both."
)
if groups_holdout is not None and X_holdout is None:
raise ValueError("groups_holdout requires explicit X_holdout/y_holdout.")
if groups_holdout is not None and groups is None:
raise ValueError("groups_holdout requires groups.")
if X_holdout is not None and groups is not None and groups_holdout is None:
raise ValueError(
"groups_holdout is required with groups and explicit holdout data."
)
self.preprocessor = (
Preprocessor(**self.preprocessor_kwargs) if self.preprocess else None
)
self.X_holdout = None
self.y_holdout = None
self.X_holdout_raw_ = None
self.y_holdout_original_ = None
self.holdout_summary_ = None
self.holdout_score_ = None
self.models_output_path_ = None
self.evidence_report_ = None
self.validation_integrity_ = None
self.leakage_checks_ = None
self.baseline_comparison_ = None
self.score_stability_ = None
self.selection_guidance_ = None
self.report_result_ = None
self.selected_estimator_ = None
self.validation_selected_estimator_ = None
self.final_preprocessor_ = None
self.final_estimator_ = None
self.fitted_preprocessors_ = None
self.fitted_training_data_ = None
self.fitted_validation_data_ = None
self.candidate_pipelines_ = None
self.groups_ = None
self.groups_modeling_ = None
self.groups_train_ = None
self.groups_validation_ = None
self.groups_holdout_ = None
self.imbalanced_ = False
Mamut._check_categorical(y)
y_original = pd.Series(y).copy()
y_original.index = X.index
if y_original.value_counts(normalize=True).min() < self.imb_threshold:
self.imbalanced_ = True
y_encoded = pd.Series(
self.le.fit_transform(y_original),
index=X.index,
name=y_original.name,
)
groups_encoded = None
if groups is not None:
if len(groups) != len(X):
raise ValueError("groups must have the same length as X.")
groups_encoded = pd.Series(groups).copy()
groups_encoded.index = X.index
self.groups_ = groups_encoded
X_modeling = X.copy()
y_modeling = y_encoded.copy()
y_modeling_original = y_original.copy()
groups_modeling = groups_encoded.copy() if groups_encoded is not None else None
if X_holdout is not None:
Mamut._check_categorical(pd.Series(y_holdout))
y_holdout_original = pd.Series(y_holdout).copy()
y_holdout_original.index = X_holdout.index
y_holdout_encoded = pd.Series(
self.le.transform(y_holdout_original),
index=X_holdout.index,
name=y_holdout_original.name,
)
self.X_holdout_raw_ = X_holdout.copy()
self.y_holdout_original_ = y_holdout_original
if groups_holdout is not None:
if len(groups_holdout) != len(X_holdout):
raise ValueError(
"groups_holdout must have the same length as X_holdout."
)
self.groups_holdout_ = pd.Series(groups_holdout).copy()
self.groups_holdout_.index = X_holdout.index
overlap = set(groups_modeling).intersection(self.groups_holdout_)
if overlap:
raise ValueError(
"Explicit holdout groups overlap modeling groups; "
"holdout evaluation would not be independent."
)
elif self.holdout_size is not None:
(
X_modeling,
self.X_holdout_raw_,
y_modeling,
y_holdout_encoded,
y_modeling_original,
self.y_holdout_original_,
groups_modeling,
self.groups_holdout_,
) = self._split_data(
X_modeling,
y_modeling,
y_modeling_original,
groups_modeling,
test_size=self.holdout_size,
)
else:
y_holdout_encoded = None
self.X_modeling_raw_ = X_modeling.copy()
self.y_modeling_raw_ = y_modeling.copy()
self.y_modeling_original_ = y_modeling_original.copy()
self.groups_modeling_ = (
groups_modeling.copy() if groups_modeling is not None else None
)
(
X_train_raw,
X_validation_raw,
y_train,
y_validation,
_,
_,
self.groups_train_,
self.groups_validation_,
) = self._split_data(
X_modeling,
y_modeling,
y_modeling_original,
groups_modeling,
test_size=self.validation_size,
)
X_train = X_train_raw
X_validation = X_validation_raw
self.y_train_raw_ = y_train.copy()
self.y_validation_raw_ = y_validation.copy()
use_shared_preprocessor = self.preprocess and (
self.preprocessing_profile == "generic_ohe"
)
if use_shared_preprocessor:
X_train, y_train = self.preprocessor.fit_transform(X_train, y_train)
X_validation = self.preprocessor.transform(X_validation)
if self.X_holdout_raw_ is not None:
self.X_holdout = self.preprocessor.transform(self.X_holdout_raw_)
elif self.X_holdout_raw_ is not None:
self.X_holdout = self.X_holdout_raw_
self.X_train = self._as_model_input(X_train)
self.X_validation = self._as_model_input(X_validation)
self.y_train = np.asarray(y_train)
self.y_validation = np.asarray(y_validation)
if self.X_holdout is not None:
self.X_holdout = self._as_model_input(self.X_holdout)
self.y_holdout = np.asarray(y_holdout_encoded)
self.X_train_raw_ = X_train_raw
self.X_validation_raw_ = X_validation_raw
self.X = X.copy()
self.y = y_original
self.y_encoded_ = y_encoded
# Backward-compatible aliases. These represent validation data, not a
# final test set.
self.X_test = self.X_validation
self.y_test = self.y_validation
self.model_selector = ModelSelector(
self.X_train,
self.y_train,
self.X_validation,
self.y_validation,
X_train_raw=self.X_train_raw_,
y_train_raw=self.y_train_raw_,
X_validation_raw=self.X_validation_raw_,
y_validation_raw=self.y_validation_raw_,
groups_train_raw=self.groups_train_,
preprocessor_factory=self._make_model_preprocessor,
exclude_models=self.exclude_models,
include_models=self.include_models,
search_profile=self.search_profile,
score_metric=self.score_metric,
optimization_method=self.optimization_method,
n_iterations=self.n_iterations,
random_state=self.random_state,
n_jobs=self.n_jobs,
verbose=self.verbose,
)
(
best_model,
_,
score_for_best_model,
fitted_models,
fitted_preprocessors,
fitted_training_data,
fitted_validation_data,
validation_summary,
studies,
) = self.model_selector.compare_models()
self.raw_fitted_models_ = fitted_models
self.fitted_preprocessors_ = fitted_preprocessors
self.fitted_training_data_ = fitted_training_data
self.fitted_validation_data_ = fitted_validation_data
self.optuna_studies_ = studies
self.fitted_models_ = [
self._make_public_pipeline(model, self.fitted_preprocessors_[model_name])
for model_name, model in fitted_models.items()
]
# Update the score metric based on binary/multiclass problem (for ensembles)
self.score_metric = self.model_selector.score_metric
self.score_metric_name = self.model_selector.score_metric_name
self.binary = self.model_selector.binary
self.roc = self.model_selector.roc
validation_summary = validation_summary.sort_values(
by=self.score_metric_name, ascending=False
).reset_index(drop=True)
self.best_validation_score_ = score_for_best_model
self.validation_selected_estimator_ = best_model
self.selected_estimator_ = best_model
self.selection_summary_ = self._build_selection_summary(
validation_summary, selected_model_name=best_model.__class__.__name__
)
if self.selection_strategy in {"nested_cv", "repeated_cv"}:
if self.selection_strategy == "repeated_cv":
warnings.warn(
"selection_strategy='repeated_cv' is deprecated; use "
"'nested_cv' for nested, fold-local model selection.",
DeprecationWarning,
stacklevel=2,
)
self.selection_summary_ = self.model_selector.nested_cv_selection_scores(
X=self.X_modeling_raw_,
y=self.y_modeling_raw_,
groups=self.groups_modeling_,
cv_splits=self.selection_cv_splits,
cv_repeats=self.selection_cv_repeats,
confidence_level=self.evidence_confidence_level,
)
selection_decision = repeated_cv_selection_decision(
self.selection_summary_,
practical_margin=self.selection_practical_margin,
)
selected_model_name = selection_decision["model"]
self.selected_estimator_ = self.raw_fitted_models_[selected_model_name]
self.selection_summary_ = self._mark_selected_model(
self.selection_summary_,
selected_model_name=selected_model_name,
selection_strategy="nested_cv",
selection_decision_status=selection_decision["status"],
)
score_for_best_model = float(
self.selection_summary_.loc[
self.selection_summary_["model"].eq(selected_model_name),
"mean_score",
].iloc[0]
)
self.best_score_ = score_for_best_model
selected_model_name = self.selected_estimator_.__class__.__name__
public_preprocessor = self.fitted_preprocessors_.get(selected_model_name)
self.preprocessor = public_preprocessor
self.X_train, self.y_train = self.fitted_training_data_[selected_model_name]
self.X_validation, self.y_validation = self.fitted_validation_data_[
selected_model_name
]
if self.X_holdout_raw_ is not None:
self.X_holdout = self._transform_for_model(
selected_model_name, self.X_holdout_raw_
)
if self.refit_final_model:
if self.selection_strategy in {"nested_cv", "repeated_cv"}:
self.final_preprocessor_, self.final_estimator_ = (
self._retune_selected_model_on_modeling_data(selected_model_name)
)
else:
self.final_preprocessor_, self.final_estimator_ = (
self._refit_selected_model(self.selected_estimator_)
)
self.selected_estimator_ = self.final_estimator_
public_preprocessor = self.final_preprocessor_
self.preprocessor = public_preprocessor
if self.X_holdout_raw_ is not None:
if self.final_preprocessor_ is not None:
self.X_holdout = self.final_preprocessor_.transform(
self.X_holdout_raw_.copy()
)
else:
self.X_holdout = self._as_model_input(self.X_holdout_raw_)
self.best_model_ = self._make_public_pipeline(
self.selected_estimator_, public_preprocessor
)
self.validation_summary_ = validation_summary
self.training_summary_ = validation_summary
self.holdout_summary_ = (
self._score_models_on_dataset(
self.raw_fitted_models_, self.X_holdout_raw_, self.y_holdout
)
if self.X_holdout_raw_ is not None
else None
)
if self.holdout_summary_ is not None and self.refit_final_model:
final_scores = self._score_model_with_metrics(
self.selected_estimator_,
self.X_holdout,
self.y_holdout,
)
selected_row = self.holdout_summary_["model"].eq(selected_model_name)
for metric_name, score in final_scores.items():
self.holdout_summary_.loc[selected_row, metric_name] = score
self.holdout_score_ = (
self._score_selected_model_on_holdout()
if self.X_holdout_raw_ is not None
else None
)
log.info(f"Best model: {best_model.__class__.__name__}")
if self.save_models:
self._save_fitted_models()
return self.best_model_
def _build_selection_summary(
self, validation_summary: pd.DataFrame, *, selected_model_name: str
) -> pd.DataFrame:
summary = validation_summary.copy()
summary["metric"] = self.score_metric_name
summary["mean_score"] = summary[self.score_metric_name]
summary["std_score"] = np.nan
summary["ci_low"] = np.nan
summary["ci_high"] = np.nan
summary["n_scores"] = 1
summary["selection_duration"] = summary["duration"]
summary["status"] = "ok"
return self._mark_selected_model(
summary,
selected_model_name=selected_model_name,
selection_strategy="single_split",
)
@staticmethod
def _mark_selected_model(
summary: pd.DataFrame,
*,
selected_model_name: str,
selection_strategy: str,
selection_decision_status: str = "confirmed",
) -> pd.DataFrame:
summary = summary.copy()
summary["selected"] = summary["model"].eq(selected_model_name)
summary["selection_strategy"] = selection_strategy
summary["selection_decision_status"] = selection_decision_status
return summary
def _save_fitted_models(self) -> None:
models_dir = os.path.join(
os.getcwd(),
self.models_output_dir,
str(time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime())),
)
os.makedirs(models_dir, exist_ok=True)
self.models_output_path_ = models_dir
for model in self.fitted_models_:
model_name = self._pipeline_model_name(model)
model_path = os.path.join(models_dir, f"{model_name}.joblib")
joblib.dump(model, model_path)
log.info(f"Saved model {model_name} to {model_path}")
def _make_public_pipeline(self, estimator, preprocessor) -> Pipeline:
steps = []
if preprocessor is not None:
steps.append(("preprocessor", preprocessor))
steps.append(("model", LabelDecodedClassifier(estimator, self.le)))
return Pipeline(steps)
@staticmethod
def _pipeline_model_name(model: Pipeline) -> str:
final_step = model.named_steps["model"]
estimator = getattr(final_step, "estimator", final_step)
return estimator.__class__.__name__
def _refit_selected_model(self, selected_estimator):
selected_model_name = selected_estimator.__class__.__name__
preprocessor = self._make_model_preprocessor(selected_model_name)
if preprocessor is not None:
X_final, y_final = preprocessor.fit_transform(
self.X_modeling_raw_.copy(),
self.y_modeling_raw_.copy(),
)
else:
X_final = self._as_model_input(self.X_modeling_raw_)
y_final = np.asarray(self.y_modeling_raw_)
final_estimator = clone(selected_estimator)
fit_estimator(final_estimator, X_final, y_final)
return preprocessor, final_estimator
def _retune_selected_model_on_modeling_data(self, selected_model_name: str):
final_selector = ModelSelector(
self._as_model_input(self.X_modeling_raw_),
np.asarray(self.y_modeling_raw_),
self._as_model_input(self.X_validation_raw_),
np.asarray(self.y_validation_raw_),
score_metric=self.model_selector.base_score_metric,
X_train_raw=self.X_modeling_raw_,
y_train_raw=self.y_modeling_raw_,
X_validation_raw=self.X_validation_raw_,
y_validation_raw=self.y_validation_raw_,
groups_train_raw=self.groups_modeling_,
preprocessor_factory=self._make_model_preprocessor,
include_models=[selected_model_name],
search_profile=self.search_profile,
optimization_method=self.optimization_method,
n_iterations=self.n_iterations,
random_state=self.random_state,
n_jobs=self.n_jobs,
verbose=self.verbose,
)
estimator, preprocessor, _, _, study = final_selector.tune_and_fit_model(
selected_model_name
)
self.optuna_studies_[selected_model_name] = study
return preprocessor, estimator
def _make_candidate_pipeline(self, model_name: str, estimator):
preprocessor = self._make_model_preprocessor(model_name)
profile = preprocessor.profile if preprocessor is not None else "generic_ohe"
return CandidatePipelineClassifier(
estimator=clone(estimator),
preprocess=preprocessor is not None,
profile=profile,
preprocessor_kwargs=self.preprocessor_kwargs.copy(),
)
def _ensemble_estimators(self, model_names=None):
names = model_names or list(self.raw_fitted_models_)
return [
(name, self._make_candidate_pipeline(name, self.raw_fitted_models_[name]))
for name in names
]
def _fitted_candidate_pipelines(self) -> dict:
pipelines = {}
selected_name = (
self.selected_estimator_.__class__.__name__
if self.selected_estimator_ is not None
else None
)
for model_name, estimator in self.raw_fitted_models_.items():
if self.refit_final_model and model_name == selected_name:
estimator = self.selected_estimator_
preprocessor = self.final_preprocessor_
else:
preprocessor = self.fitted_preprocessors_.get(model_name)
pipeline = self._make_candidate_pipeline(model_name, estimator)
pipeline.estimator_ = estimator
pipeline.preprocessor_ = preprocessor
pipeline.classes_ = estimator.classes_
pipelines[model_name] = pipeline
return pipelines
def _split_data(self, X, y, y_original, groups, *, test_size: float):
if groups is None:
X_train, X_eval, y_train, y_eval, y_original_train, y_original_eval = (
train_test_split(
X,
y,
y_original,
test_size=test_size,
stratify=y,
random_state=self.random_state,
)
)
return (
X_train,
X_eval,
y_train,
y_eval,
y_original_train,
y_original_eval,
None,
None,
)
target_distribution = pd.Series(y).value_counts(normalize=True)
splitter = GroupShuffleSplit(
n_splits=32,
test_size=test_size,
random_state=self.random_state,
)
candidates = []
for train_idx, eval_idx in splitter.split(X, y, groups):
y_train = y.iloc[train_idx]
y_eval = y.iloc[eval_idx]
if any(
(
y_train.nunique() < target_distribution.size,
y_eval.nunique() < target_distribution.size,
)
):
continue
eval_distribution = y_eval.value_counts(normalize=True).reindex(
target_distribution.index, fill_value=0.0
)
balance_error = float((eval_distribution - target_distribution).abs().sum())
size_error = abs((len(eval_idx) / len(X)) - test_size)
candidates.append((balance_error + size_error, train_idx, eval_idx))
if not candidates:
raise ValueError(
"Unable to construct a group-disjoint split containing every "
"target class in both partitions."
)
_, train_idx, eval_idx = min(candidates, key=lambda candidate: candidate[0])
return (
X.iloc[train_idx],
X.iloc[eval_idx],
y.iloc[train_idx],
y.iloc[eval_idx],
y_original.iloc[train_idx],
y_original.iloc[eval_idx],
groups.iloc[train_idx],
groups.iloc[eval_idx],
)
def _score_selected_model_on_holdout(self) -> float:
if self.refit_final_model:
if self.final_preprocessor_ is not None:
X_holdout = self.final_preprocessor_.transform(self.X_holdout_raw_)
else:
X_holdout = self._as_model_input(self.X_holdout_raw_)
return self._score_model_on_dataset(
self.selected_estimator_, X_holdout, self.y_holdout
)
return self._score_model_on_dataset(
self.selected_estimator_,
self._transform_for_model(
self.selected_estimator_.__class__.__name__, self.X_holdout_raw_
),
self.y_holdout,
)
@staticmethod
def _as_model_input(data):
if isinstance(data, pd.DataFrame):
return data.values
return data
def _score_models_on_dataset(self, models: dict, X, y) -> pd.DataFrame:
duration_by_model = {}
if isinstance(self.validation_summary_, pd.DataFrame):
duration_by_model = dict(
zip(
self.validation_summary_["model"],
self.validation_summary_["duration"],
)
)
model_order = (
self.validation_summary_["model"].to_list()
if isinstance(self.validation_summary_, pd.DataFrame)
else list(models)
)
rows = []
for model_name in model_order:
model = models[model_name]
X_model = self._transform_for_model(model_name, X)
rows.append(
{
"model": model_name,
**self._score_model_with_metrics(model, X_model, y),
"duration": duration_by_model.get(model_name, np.nan),
}
)
return pd.DataFrame(rows)
def _transform_for_model(self, model_name: str, X: pd.DataFrame):
if X is None:
return None
preprocessor = (
self.fitted_preprocessors_.get(model_name)
if isinstance(self.fitted_preprocessors_, dict)
else None
)
if preprocessor is None:
return self._as_model_input(X)
return preprocessor.transform(X.copy())
def _score_model_with_metrics(self, fitted_model, X, y) -> dict:
y = np.asarray(y)
y_pred = fitted_model.predict(X)
y_pred_proba = fitted_model.predict_proba(X)
if self.binary:
y_pred_proba = y_pred_proba[:, 1]
try:
roc_auc = roc_auc_score(
y,
y_pred_proba,
multi_class="ovr",
average="weighted",
)
except ValueError:
roc_auc = np.nan
results = {
"accuracy_score": accuracy_score(y, y_pred),
"balanced_accuracy_score": balanced_accuracy_score(y, y_pred),
"precision_score": precision_score(
y, y_pred, average="weighted", zero_division=0
),
"recall_score": recall_score(
y, y_pred, average="weighted", zero_division=0
),
"f1_score": f1_score(y, y_pred, average="weighted", zero_division=0),
"jaccard_score": jaccard_score(
y, y_pred, average="weighted", zero_division=0
),
"roc_auc_score": roc_auc,
}
return {
self.score_metric_name: results.pop(self.score_metric_name),
**results,
}
def _score_model_on_dataset(self, fitted_model, X, y) -> float:
if self.roc:
if self.binary:
predictions = fitted_model.predict_proba(X)[:, 1]
else:
predictions = fitted_model.predict_proba(X)
else:
predictions = fitted_model.predict(X)
return self.score_metric(y, predictions)
[docs]
def predict(self, X: pd.DataFrame):
"""
Predicts the target variable for the given data.
Parameters
----------
X : pd.DataFrame
The input features.
Returns
-------
np.ndarray
Predicted target variable.
"""
return self._predict(X)
[docs]
def predict_proba(self, X: pd.DataFrame):
"""
Predicts the probabilities of the target variable for the given data.
Parameters
----------
X : pd.DataFrame
The input features.
Returns
-------
np.ndarray
Predicted probabilities of the target variable.
"""
return self._predict(X, proba=True)
[docs]
def evaluate(
self,
n_top_models: int = 3,
dataset: Literal["auto", "validation", "holdout"] = "auto",
include_evidence: bool = True,
output_dir: str = "mamut_report",
include_shap: bool = True,
shap_max_samples: Optional[int] = 200,
display_plots: bool = False,
write_html: bool = True,
save_plots: bool = True,
) -> dict:
"""
Evaluates the fitted models.
"""
self._check_fitted()
if not isinstance(n_top_models, int) or n_top_models < 1:
raise ValueError(
"n_top_models must be an integer greater than or equal to 1."
)
_, y_evaluation, evaluation_summary, evaluation_dataset = (
self._get_evaluation_dataset(dataset)
)
evidence_report = (
self.generate_evidence(dataset=evaluation_dataset)
if include_evidence
else None
)
evaluator = ModelEvaluator(
self._fitted_candidate_pipelines(),
X_evaluation=(
self.X_holdout_raw_
if evaluation_dataset == "holdout"
else self.X_validation_raw_
),
y_evaluation=y_evaluation,
X_train=self.X_train,
y_train=self.y_train,
X_explanation=self.X_train_raw_,
X=self.X,
y=self.y,
optimizer=self.optimization_method,
metric=self.score_metric_name,
n_trials=self.n_iterations,
excluded_models=self.exclude_models,
studies=self.optuna_studies_,
training_summary=evaluation_summary,
pca_loadings=(
self.preprocessor.pca_loadings_ if self.preprocessor else None
),
binary=self.model_selector.binary,
preprocessing_steps=self.preprocessor.report() if self.preprocessor else {},
feature_names=(
self.preprocessor.feature_names_out_
if self.preprocessor and self.preprocessor.feature_names_out_
else self.X.columns.tolist()
),
n_top_models=n_top_models,
is_ensemble=self.greedy_ensemble_ is not None,
greedy_ensemble=self.greedy_ensemble_,
evaluation_dataset=evaluation_dataset,
selected_model_name=self.selected_estimator_.__class__.__name__,
rank_by_metric=evaluation_dataset == "validation",
evidence_report=evidence_report,
report_output_path=output_dir,
include_shap=include_shap,
shap_max_samples=shap_max_samples,
write_html=write_html,
save_plots=save_plots,
)
evaluator.evaluate_to_html(evaluation_summary)
if display_plots:
evaluator.plot_results_in_notebook()
self.report_result_ = getattr(evaluator, "report_result_", None)
return self.report_result_
[docs]
def generate_evidence(
self,
dataset: Literal["auto", "validation", "holdout"] = "auto",
include_candidate_comparison: bool = True,
) -> dict:
"""Build diagnostic evidence without changing the fitted candidate.
Parameters
----------
dataset : Literal["auto", "validation", "holdout"]
Evaluation partition to summarize.
include_candidate_comparison : bool
Whether to score non-selected MAMUT candidates alongside fixed
baselines. Disable this for a locked final confirmation analysis.
"""
self._check_fitted()
_, _, _, evaluation_dataset = self._get_evaluation_dataset(dataset)
if evaluation_dataset == "holdout":
X_evaluation_raw = self.X_holdout_raw_
y_evaluation_raw = pd.Series(self.y_holdout, index=X_evaluation_raw.index)
else:
X_evaluation_raw = self.X_validation_raw_
y_evaluation_raw = self.y_validation_raw_
if evaluation_dataset == "holdout":
X_evidence_train = self.X_modeling_raw_
y_evidence_train = self.y_modeling_raw_
groups_evidence_train = self.groups_modeling_
groups_evaluation = self.groups_holdout_
else:
X_evidence_train = self.X_train_raw_
y_evidence_train = self.y_train_raw_
groups_evidence_train = self.groups_train_
groups_evaluation = self.groups_validation_
self.evidence_report_ = build_evidence_report(
X=self.X_modeling_raw_,
y=self.y_modeling_raw_,
y_leakage=self.y_modeling_original_,
X_train=X_evidence_train,
y_train=y_evidence_train,
X_evaluation=X_evaluation_raw,
y_evaluation=y_evaluation_raw,
selected_estimator=self.selected_estimator_,
candidate_estimators=(
self.raw_fitted_models_ if include_candidate_comparison else None
),
metric_name=self.score_metric_name,
binary=self.binary,
preprocessor_factory=self._make_model_preprocessor,
evaluation_dataset=evaluation_dataset,
holdout_available=self.X_holdout is not None,
groups=self.groups_modeling_,
groups_train=groups_evidence_train,
groups_evaluation=groups_evaluation,
cv_splits=self.evidence_cv_splits,
cv_repeats=self.evidence_cv_repeats,
confidence_level=self.evidence_confidence_level,
random_state=self.random_state,
practical_margin=self.evidence_practical_margin,
)
self.validation_integrity_ = self.evidence_report_["validation_integrity"]
self.leakage_checks_ = self.evidence_report_["leakage_checks"]
self.baseline_comparison_ = self.evidence_report_["baseline_comparison"]
self.score_stability_ = self.evidence_report_["score_stability"]
self.selection_guidance_ = self.evidence_report_["selection_guidance"]
return self.evidence_report_
def _make_model_preprocessor(self, model_name: Optional[str] = None):
if not self.preprocess:
return None
profile = self._preprocessing_profile_for_label(model_name)
kwargs = self.preprocessor_kwargs.copy()
if profile == "native_categorical" and (
kwargs.get("pca") or kwargs.get("feature_selection")
):
profile = "tree_ohe"
if all(
[
profile == "native_categorical",
self.imbalanced_,
kwargs.get("imbalanced_resampling", True),
]
):
profile = "tree_ohe"
kwargs["profile"] = profile
return Preprocessor(**kwargs)
def _preprocessing_profile_for_label(self, model_name: Optional[str]) -> str:
if self.preprocessing_profile == "generic_ohe" or model_name is None:
return "generic_ohe"
normalized_name = self._normalize_model_label(model_name)
if normalized_name in set(available_model_names()):
return preprocessing_profile_for_model(normalized_name)
return "generic_ohe"
@staticmethod
def _normalize_model_label(model_name: str) -> str:
for prefix in ("MAMUT Candidate (", "MAMUT Selected ("):
if model_name.startswith(prefix) and model_name.endswith(")"):
return model_name[len(prefix) : -1]
baseline_aliases = {
"Logistic Regression": "LogisticRegression",
"Random Forest": "RandomForestClassifier",
"Dummy Most Frequent": "DummyClassifier",
}
return baseline_aliases.get(model_name, model_name)
def _make_evidence_preprocessor(self, model_name: Optional[str] = None):
return self._make_model_preprocessor(model_name)
def _get_evaluation_dataset(self, dataset: str):
if dataset not in {"auto", "validation", "holdout"}:
raise ValueError("dataset must be one of: 'auto', 'validation', 'holdout'.")
if dataset == "auto":
dataset = "holdout" if self.X_holdout is not None else "validation"
if dataset == "holdout":
if self.X_holdout is None or self.y_holdout is None:
raise ValueError(
"No holdout data is available. Provide holdout_size, or pass "
"X_holdout and y_holdout to fit()."
)
if self.holdout_summary_ is None:
self.holdout_summary_ = self._score_models_on_dataset(
self.raw_fitted_models_,
self.X_holdout_raw_,
self.y_holdout,
)
return (
self.X_holdout,
self.y_holdout,
self.holdout_summary_,
"holdout",
)
return (
self.X_validation,
self.y_validation,
self.validation_summary_,
"validation",
)
[docs]
def save_best_model(self, path: str) -> None:
"""
Saves the best model to the specified path.
Parameters
----------
path : str
The path to save the best model.
"""
self._check_fitted()
save_path = os.path.join(
path, f"{self._pipeline_model_name(self.best_model_)}.joblib"
)
joblib.dump(self.best_model_, save_path)
log.info(f"Saved best model to {save_path}")
[docs]
def create_ensemble(self, voting: Literal["soft", "hard"] = "soft") -> Pipeline:
"""
Creates an ensemble of the fitted models.
Parameters
----------
voting : Literal["soft", "hard"]
Voting strategy for the ensemble.
Returns
-------
Pipeline
The ensemble model pipeline.
"""
self._check_fitted()
ensemble = VotingClassifier(
estimators=self._ensemble_estimators(),
voting=voting,
)
ensemble.fit(self.X_train_raw_, self.y_train_raw_)
y_pred = ensemble.predict(self.X_validation_raw_)
score = self.score_metric(self.y_validation_raw_, y_pred)
self.ensemble_ = self._make_public_pipeline(ensemble, None)
log.info(
f"Created ensemble with all models and voting='{voting}'. "
f"Ensemble score on validation set: {score:.4f} {self.score_metric.__name__}"
)
return self.ensemble_
def _create_greedy_ensemble_voting(
self, n_models: int = 6, voting: Literal["soft", "hard"] = "soft"
) -> Pipeline:
"""
Creates a greedy ensemble of the fitted models.
Parameters
----------
n_models : int
Number of models to include in the ensemble.
voting : Literal["soft", "hard"]
Voting strategy for the ensemble.
Returns
-------
Pipeline
The greedy ensemble model pipeline.
"""
self._check_fitted()
n_models = min(n_models, len(self.raw_fitted_models_))
if n_models < 2:
raise ValueError(
"At least two fitted models are required to build an ensemble."
)
ranked_names = (
self.validation_summary_.sort_values(
self.score_metric_name, ascending=False
)["model"]
.head(n_models)
.tolist()
)
selected_names = [ranked_names[0]]
best_ensemble = None
best_score = -np.inf
while len(selected_names) < n_models:
round_best_name = None
round_best_ensemble = None
round_best_score = -np.inf
for candidate_name in sorted(set(ranked_names).difference(selected_names)):
candidate_names = [*selected_names, candidate_name]
candidate_ensemble = VotingClassifier(
estimators=self._ensemble_estimators(candidate_names),
voting=voting,
)
candidate_ensemble.fit(self.X_train_raw_, self.y_train_raw_)
score = self.score_metric(
self.y_validation_raw_,
candidate_ensemble.predict(self.X_validation_raw_),
)
if score > round_best_score:
round_best_name = candidate_name
round_best_ensemble = candidate_ensemble
round_best_score = score
selected_names.append(round_best_name)
best_ensemble = round_best_ensemble
best_score = round_best_score
self.ensemble_models_ = selected_names
self.greedy_ensemble_ = self._make_public_pipeline(best_ensemble, None)
log.info(
f"Created greedy ensemble with voting='{voting}' \n"
f"and {n_models} models: {selected_names} \n"
f"Ensemble score on validation set: {best_score:.4f} {self.score_metric.__name__}"
)
return self.greedy_ensemble_
[docs]
def create_greedy_ensemble(self, max_models=6):
return self._create_greedy_ensemble_voting(
n_models=max_models,
voting="soft",
)
def _predict(self, X: pd.DataFrame, proba: bool = False):
"""
Predicts the target variable or probabilities for the given data.
Parameters
----------
X : pd.DataFrame
The input features.
proba : bool
Whether to predict probabilities instead of the target variable.
Returns
-------
np.ndarray
Predicted target variable or probabilities.
"""
self._check_fitted()
if proba:
return self.best_model_.predict_proba(X)
return self.best_model_.predict(X)
def _check_fitted(self):
"""
Checks if the model has been fitted.
Raises
------
RuntimeError
If the model has not been fitted.
"""
if not self.best_model_:
raise RuntimeError(
"Can't predict because no model has been fitted. "
"Please call fit() method first."
)
@staticmethod
def _check_categorical(y):
"""
Checks if the target variable is categorical.
Parameters
----------
y : pd.Series
The target variable.
Raises
------
ValueError
If the target variable is not categorical.
"""
if pd.api.types.is_float_dtype(y):
raise ValueError("Target variable must be categorical.")
@staticmethod
def _validate_split_size(value: float, name: str) -> None:
if not 0 < value < 1:
raise ValueError(f"{name} must be greater than 0 and less than 1.")