import os import sys import time import inspect from datetime import datetime import joblib import numpy as np from sklearn.ensemble import ExtraTreesRegressor, GradientBoostingRegressor, RandomForestRegressor from sklearn.feature_selection import SelectKBest, f_regression from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from sklearn.model_selection import RandomizedSearchCV, train_test_split from sklearn.preprocessing import RobustScaler sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import config from core.deep_learning_model import is_available as deep_learning_available from core.deep_learning_model import train_lstm_mlp from core.model_features import ( NUMERICAL_OUTLIER_COLUMNS, ORDINAL_COLUMNS, TARGET_COLUMN, align_feature_frame, apply_label_encoders, apply_outlier_bounds, engineer_features, extract_xy, fit_label_encoders, fit_outlier_bounds, make_target_bins, normalize_columns, prepare_modeling_dataframe, to_float_array, ) from core.preprocessing import get_clean_data try: import lightgbm as lgb except ImportError: lgb = None try: import xgboost as xgb except ImportError: xgb = None def patch_lightgbm_sklearn_compatibility(): if lgb is None: return try: from sklearn.utils.validation import check_X_y except Exception: return params = inspect.signature(check_X_y).parameters if 'force_all_finite' in params: return def wrapped_check_X_y(*args, force_all_finite=None, **kwargs): if ( force_all_finite is not None and 'ensure_all_finite' in params and 'ensure_all_finite' not in kwargs ): kwargs['ensure_all_finite'] = force_all_finite return check_X_y(*args, **kwargs) try: import lightgbm.compat as lgb_compat import lightgbm.sklearn as lgb_sklearn lgb_compat._LGBMCheckXY = wrapped_check_X_y lgb_sklearn._LGBMCheckXY = wrapped_check_X_y except Exception: pass patch_lightgbm_sklearn_compatibility() def print_training_log(model_name, start_time, best_score, best_params, n_iter, cv_folds): elapsed = time.time() - start_time print(f' {"-" * 50}') print(f' Model: {model_name}') print(f' Time: {elapsed:.1f}s') print(f' Best CV R2: {best_score:.4f}') for key, value in best_params.items(): print(f' - {key}: {value}') print(f' Iterations: {n_iter}, CV folds: {cv_folds}') class OptimizedModelTrainer: def __init__(self): self.models = {} self.scaler = RobustScaler() self.feature_names = None self.selected_features = None self.label_encoders = {} self.model_metrics = {} self.training_metadata = {} self.feature_selector = None self.outlier_bounds = {} self.feature_k = 22 self.target_transform = 'log1p' self.enabled_models = ['random_forest', 'gradient_boosting', 'extra_trees', 'lightgbm', 'xgboost'] if deep_learning_available(): self.enabled_models.append('lstm_mlp') self.raw_train_df = None self.raw_test_df = None def analyze_data(self, df): y = df[TARGET_COLUMN] print('\nData Analysis') print(f' Samples: {len(df)}') print(f' Mean: {y.mean():.2f}, Median: {y.median():.2f}, Std: {y.std():.2f}') print(f' High risk ratio (>8h): {(y > 8).mean() * 100:.1f}%') def select_features(self, X, y, k=20): selector = SelectKBest(score_func=f_regression, k=min(k, X.shape[1])) selector.fit(X, y) self.feature_selector = selector mask = selector.get_support() self.selected_features = [name for name, keep in zip(self.feature_names, mask) if keep] return selector.transform(X) def transform_target(self, y): return np.log1p(np.clip(y, a_min=0, a_max=None)) if self.target_transform == 'log1p' else y def inverse_transform_target(self, y_pred): return np.expm1(y_pred) if self.target_transform == 'log1p' else y_pred def transform_features(self, X_df): X_df = align_feature_frame(X_df, self.feature_names) X = self.scaler.transform(to_float_array(X_df)) return self.feature_selector.transform(X) if self.feature_selector else X def prepare_data(self): raw_df = normalize_columns(get_clean_data()) self.analyze_data(prepare_modeling_dataframe(raw_df.copy())) target_bins = make_target_bins(raw_df[TARGET_COLUMN].values) raw_train_df, raw_test_df = train_test_split( raw_df, test_size=config.TEST_SIZE, random_state=config.RANDOM_STATE, stratify=target_bins, ) self.raw_train_df = raw_train_df.reset_index(drop=True) self.raw_test_df = raw_test_df.reset_index(drop=True) train_df = prepare_modeling_dataframe(self.raw_train_df) test_df = prepare_modeling_dataframe(self.raw_test_df) self.outlier_bounds = fit_outlier_bounds(train_df, NUMERICAL_OUTLIER_COLUMNS) train_df = apply_outlier_bounds(train_df, self.outlier_bounds) test_df = apply_outlier_bounds(test_df, self.outlier_bounds) train_df = engineer_features(train_df) test_df = engineer_features(test_df) X_train_df, y_train = extract_xy(train_df) X_test_df, y_test = extract_xy(test_df) X_train_df, self.label_encoders = fit_label_encoders(X_train_df, ORDINAL_COLUMNS) X_test_df = apply_label_encoders(X_test_df, self.label_encoders) self.feature_names = list(X_train_df.columns) X_test_df = align_feature_frame(X_test_df, self.feature_names) X_train = self.scaler.fit_transform(to_float_array(X_train_df)) X_test = self.scaler.transform(to_float_array(X_test_df)) transformed_target = self.transform_target(y_train) X_train = self.select_features(X_train, transformed_target, k=self.feature_k) X_test = self.transform_features(X_test_df) self.training_metadata = { 'train_samples': int(len(train_df)), 'test_samples': int(len(test_df)), 'feature_count_before_selection': int(len(self.feature_names)), 'feature_count_after_selection': int(X_train.shape[1]), 'training_date': datetime.now().strftime('%Y-%m-%d'), 'target_transform': self.target_transform, 'available_models': [], 'deep_learning_available': False, } return X_train, X_test, y_train, y_test def _run_search(self, name, estimator, params, X_train, y_train, n_iter=12): start_time = time.time() search = RandomizedSearchCV( estimator, param_distributions=params, n_iter=n_iter, cv=4, scoring='r2', n_jobs=-1, random_state=config.RANDOM_STATE, ) search.fit(X_train, y_train) self.models[name] = search.best_estimator_ print_training_log(name, start_time, search.best_score_, search.best_params_, n_iter, 4) def train_random_forest(self, X_train, y_train): self._run_search( 'random_forest', RandomForestRegressor(random_state=config.RANDOM_STATE, n_jobs=-1), { 'n_estimators': [200, 300, 400], 'max_depth': [10, 14, 18, None], 'min_samples_split': [2, 4, 8], 'min_samples_leaf': [1, 2, 3], 'max_features': ['sqrt', 0.7], }, X_train, y_train, ) def train_gradient_boosting(self, X_train, y_train): self._run_search( 'gradient_boosting', GradientBoostingRegressor(random_state=config.RANDOM_STATE), { 'n_estimators': [160, 220, 300], 'max_depth': [3, 4, 5], 'learning_rate': [0.03, 0.05, 0.08], 'subsample': [0.7, 0.85, 1.0], 'min_samples_split': [2, 4, 6], 'min_samples_leaf': [1, 2, 3], }, X_train, y_train, ) def train_extra_trees(self, X_train, y_train): self._run_search( 'extra_trees', ExtraTreesRegressor(random_state=config.RANDOM_STATE, n_jobs=-1), { 'n_estimators': [220, 320, 420], 'max_depth': [10, 15, 20, None], 'min_samples_split': [2, 4, 8], 'min_samples_leaf': [1, 2, 3], 'max_features': ['sqrt', 0.7], }, X_train, y_train, ) def train_lightgbm(self, X_train, y_train): if lgb is None: return try: self._run_search( 'lightgbm', lgb.LGBMRegressor(random_state=config.RANDOM_STATE, n_jobs=-1, verbose=-1), { 'n_estimators': [180, 260, 340], 'max_depth': [7, 9, -1], 'learning_rate': [0.03, 0.05, 0.08], 'subsample': [0.7, 0.85, 1.0], 'colsample_bytree': [0.7, 0.85, 1.0], 'num_leaves': [31, 50, 70], }, X_train, y_train, ) except Exception as exc: print(f' {"-" * 50}') print(' Model: lightgbm') print(f' Skipped: {exc}') def train_xgboost(self, X_train, y_train): if xgb is None: return self._run_search( 'xgboost', xgb.XGBRegressor(random_state=config.RANDOM_STATE, n_jobs=-1), { 'n_estimators': [180, 260, 340], 'max_depth': [4, 6, 8], 'learning_rate': [0.03, 0.05, 0.08], 'subsample': [0.7, 0.85, 1.0], 'colsample_bytree': [0.7, 0.85, 1.0], 'min_child_weight': [1, 3, 5], }, X_train, y_train, ) def evaluate_model(self, model, X_test, y_test): y_pred = self.inverse_transform_target(model.predict(X_test)) y_pred = np.clip(y_pred, a_min=0, a_max=None) mse = mean_squared_error(y_test, y_pred) return { 'r2': round(r2_score(y_test, y_pred), 4), 'mse': round(mse, 4), 'rmse': round(np.sqrt(mse), 4), 'mae': round(mean_absolute_error(y_test, y_pred), 4), } def save_models(self): os.makedirs(config.MODELS_DIR, exist_ok=True) for name, model in self.models.items(): joblib.dump(model, os.path.join(config.MODELS_DIR, f'{name}_model.pkl')) self.training_metadata['available_models'] = list(self.model_metrics.keys()) joblib.dump(self.scaler, config.SCALER_PATH) joblib.dump(self.feature_names, os.path.join(config.MODELS_DIR, 'feature_names.pkl')) joblib.dump(self.selected_features, os.path.join(config.MODELS_DIR, 'selected_features.pkl')) joblib.dump(self.label_encoders, os.path.join(config.MODELS_DIR, 'label_encoders.pkl')) joblib.dump(self.model_metrics, os.path.join(config.MODELS_DIR, 'model_metrics.pkl')) joblib.dump(self.training_metadata, os.path.join(config.MODELS_DIR, 'training_metadata.pkl')) def train_all(self): print('\nOptimized Model Training Started') X_train, X_test, y_train, y_test = self.prepare_data() y_train_transformed = self.transform_target(y_train) if 'random_forest' in self.enabled_models: self.train_random_forest(X_train, y_train_transformed) if 'gradient_boosting' in self.enabled_models: self.train_gradient_boosting(X_train, y_train_transformed) if 'extra_trees' in self.enabled_models: self.train_extra_trees(X_train, y_train_transformed) if 'lightgbm' in self.enabled_models: self.train_lightgbm(X_train, y_train_transformed) if 'xgboost' in self.enabled_models: self.train_xgboost(X_train, y_train_transformed) for name, model in self.models.items(): metrics = self.evaluate_model(model, X_test, y_test) self.model_metrics[name] = metrics print(f' {name:20s} R2={metrics["r2"]:.4f} RMSE={metrics["rmse"]:.4f} MAE={metrics["mae"]:.4f}') if 'lstm_mlp' in self.enabled_models and self.raw_train_df is not None and self.raw_test_df is not None: deep_model_path = os.path.join(config.MODELS_DIR, 'lstm_mlp_model.pt') deep_result = train_lstm_mlp( self.raw_train_df, self.raw_test_df, deep_model_path, target_transform=self.target_transform, ) if deep_result: self.model_metrics['lstm_mlp'] = deep_result['metrics'] self.training_metadata['deep_learning_available'] = True self.training_metadata.update(deep_result['metadata']) print( f' {"lstm_mlp":20s} R2={deep_result["metrics"]["r2"]:.4f} ' f'RMSE={deep_result["metrics"]["rmse"]:.4f} MAE={deep_result["metrics"]["mae"]:.4f}' ) self.save_models() return self.model_metrics def train_and_save_models(): start = time.time() trainer = OptimizedModelTrainer() metrics = trainer.train_all() print(f'\nTraining Complete in {time.time() - start:.1f}s') for idx, (name, metric) in enumerate(sorted(metrics.items(), key=lambda item: item[1]['r2'], reverse=True), start=1): print(f'{idx}. {name} - R2={metric["r2"]:.4f}') return metrics if __name__ == '__main__': train_and_save_models()