import json import os import sys from pathlib import Path import joblib import matplotlib import matplotlib.pyplot as plt import numpy as np import pandas as pd from sklearn.metrics import confusion_matrix from sklearn.model_selection import train_test_split sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import config from core.deep_learning_model import ( _build_sequence_arrays, load_lstm_mlp_bundle, ) from core.model_features import ( NUMERICAL_OUTLIER_COLUMNS, TARGET_COLUMN, apply_outlier_bounds, engineer_features, fit_outlier_bounds, make_target_bins, normalize_columns, ) from core.preprocessing import get_clean_data matplotlib.rcParams['font.sans-serif'] = [ 'Microsoft YaHei', 'SimHei', 'Noto Sans CJK SC', 'Arial Unicode MS', 'DejaVu Sans', ] matplotlib.rcParams['axes.unicode_minus'] = False BASE_DIR = Path(config.BASE_DIR) MODELS_DIR = Path(config.MODELS_DIR) OUTPUT_DIR = BASE_DIR / 'outputs' / 'eval_figures' PREDICTION_CSV = OUTPUT_DIR / 'lstm_predictions.csv' SUMMARY_JSON = OUTPUT_DIR / 'evaluation_summary.json' MODEL_DISPLAY_NAMES = { 'lstm_mlp': '时序注意力融合网络', 'xgboost': 'XGBoost', 'gradient_boosting': 'GBDT', 'random_forest': '随机森林', 'extra_trees': '极端随机树', 'lightgbm': 'LightGBM', } def ensure_output_dir(): OUTPUT_DIR.mkdir(parents=True, exist_ok=True) def load_metrics(): metrics_path = MODELS_DIR / 'model_metrics.pkl' if not metrics_path.exists(): raise FileNotFoundError(f'未找到模型评估文件: {metrics_path}') metrics = joblib.load(metrics_path) return dict(sorted(metrics.items(), key=lambda item: item[1].get('r2', -999), reverse=True)) def get_test_split(): raw_df = normalize_columns(get_clean_data()) target_bins = make_target_bins(raw_df[TARGET_COLUMN].values) raw_train_df, raw_test_df = train_test_split( raw_df, test_size=config.TEST_SIZE, random_state=config.RANDOM_STATE, stratify=target_bins, ) return raw_train_df.reset_index(drop=True), raw_test_df.reset_index(drop=True) def classify_risk(values): values = np.asarray(values, dtype=float) return np.where(values < 4, '低风险', np.where(values <= 8, '中风险', '高风险')) def load_lstm_predictions(): model_path = MODELS_DIR / 'lstm_mlp_model.pt' if not model_path.exists(): raise FileNotFoundError(f'未找到深度学习模型文件: {model_path}') bundle = load_lstm_mlp_bundle(str(model_path)) if bundle is None: raise RuntimeError('无法加载深度学习模型,请确认 torch 环境和模型文件正常。') raw_train_df, raw_test_df = get_test_split() outlier_bounds = fit_outlier_bounds(raw_train_df, NUMERICAL_OUTLIER_COLUMNS) fit_df = apply_outlier_bounds(raw_train_df, outlier_bounds) test_df = apply_outlier_bounds(raw_test_df, outlier_bounds) feature_layout = bundle['feature_layout'] category_maps = bundle['category_maps'] target_transform = bundle['target_transform'] _, _, _, _, _ = _build_sequence_arrays( fit_df, feature_layout, category_maps, target_transform, ) test_seq_num, test_seq_cat, test_static_num, test_static_cat, y_test = _build_sequence_arrays( test_df, feature_layout, category_maps, target_transform, ) test_seq_num = ((test_seq_num - bundle['seq_mean']) / bundle['seq_std']).astype(np.float32) test_static_num = ((test_static_num - bundle['static_mean']) / bundle['static_std']).astype(np.float32) import torch model = bundle['model'] model.eval() with torch.no_grad(): predictions = model( torch.tensor(test_seq_num, dtype=torch.float32), torch.tensor(test_seq_cat, dtype=torch.long), torch.tensor(test_static_num, dtype=torch.float32), torch.tensor(test_static_cat, dtype=torch.long), ).cpu().numpy() if target_transform == 'log1p': y_true = np.expm1(y_test) y_pred = np.expm1(predictions) else: y_true = y_test y_pred = predictions y_true = np.asarray(y_true, dtype=float) y_pred = np.clip(np.asarray(y_pred, dtype=float), a_min=0.0, a_max=None) residuals = y_pred - y_true prediction_df = pd.DataFrame({ '真实值': np.round(y_true, 4), '预测值': np.round(y_pred, 4), '残差': np.round(residuals, 4), '真实风险等级': classify_risk(y_true), '预测风险等级': classify_risk(y_pred), }) prediction_df.to_csv(PREDICTION_CSV, index=False, encoding='utf-8-sig') return prediction_df def plot_model_comparison(metrics): model_names = [MODEL_DISPLAY_NAMES.get(name, name) for name in metrics.keys()] r2_values = [metrics[name]['r2'] for name in metrics] rmse_values = [metrics[name]['rmse'] for name in metrics] mae_values = [metrics[name]['mae'] for name in metrics] fig, axes = plt.subplots(1, 3, figsize=(18, 5)) bar_colors = ['#0f766e' if name == 'lstm_mlp' else '#94a3b8' for name in metrics.keys()] axes[0].bar(model_names, r2_values, color=bar_colors) axes[0].set_title('模型R2对比') axes[0].set_ylabel('R2') axes[0].tick_params(axis='x', rotation=20) axes[1].bar(model_names, rmse_values, color=bar_colors) axes[1].set_title('模型RMSE对比') axes[1].set_ylabel('RMSE') axes[1].tick_params(axis='x', rotation=20) axes[2].bar(model_names, mae_values, color=bar_colors) axes[2].set_title('模型MAE对比') axes[2].set_ylabel('MAE') axes[2].tick_params(axis='x', rotation=20) fig.tight_layout() fig.savefig(OUTPUT_DIR / '01_模型性能对比.png', dpi=220, bbox_inches='tight') plt.close(fig) def plot_actual_vs_pred(prediction_df): y_true = prediction_df['真实值'].to_numpy() y_pred = prediction_df['预测值'].to_numpy() max_value = max(float(y_true.max()), float(y_pred.max())) fig, ax = plt.subplots(figsize=(7, 7)) ax.scatter(y_true, y_pred, s=18, alpha=0.55, color='#0f766e', edgecolors='none') ax.plot([0, max_value], [0, max_value], color='#dc2626', linestyle='--', linewidth=1.5) ax.set_title('LSTM模型真实值与预测值对比') ax.set_xlabel('真实缺勤时长(小时)') ax.set_ylabel('预测缺勤时长(小时)') fig.tight_layout() fig.savefig(OUTPUT_DIR / '02_LSTM真实值_vs_预测值.png', dpi=220, bbox_inches='tight') plt.close(fig) def plot_residuals(prediction_df): y_pred = prediction_df['预测值'].to_numpy() residuals = prediction_df['残差'].to_numpy() fig, axes = plt.subplots(1, 2, figsize=(14, 5)) axes[0].hist(residuals, bins=30, color='#2563eb', alpha=0.85, edgecolor='white') axes[0].axvline(0, color='#dc2626', linestyle='--', linewidth=1.2) axes[0].set_title('LSTM残差分布') axes[0].set_xlabel('残差(预测值 - 真实值)') axes[0].set_ylabel('样本数') axes[1].scatter(y_pred, residuals, s=18, alpha=0.55, color='#7c3aed', edgecolors='none') axes[1].axhline(0, color='#dc2626', linestyle='--', linewidth=1.2) axes[1].set_title('LSTM残差散点图') axes[1].set_xlabel('预测缺勤时长(小时)') axes[1].set_ylabel('残差') fig.tight_layout() fig.savefig(OUTPUT_DIR / '03_LSTM残差分析.png', dpi=220, bbox_inches='tight') plt.close(fig) def plot_confusion_matrix(prediction_df): labels = ['低风险', '中风险', '高风险'] cm = confusion_matrix( prediction_df['真实风险等级'], prediction_df['预测风险等级'], labels=labels, ) fig, ax = plt.subplots(figsize=(6, 5)) image = ax.imshow(cm, cmap='GnBu') ax.set_title('LSTM风险等级混淆矩阵') ax.set_xlabel('预测类别') ax.set_ylabel('真实类别') ax.set_xticks(range(len(labels))) ax.set_xticklabels(labels) ax.set_yticks(range(len(labels))) ax.set_yticklabels(labels) for row in range(cm.shape[0]): for col in range(cm.shape[1]): ax.text(col, row, int(cm[row, col]), ha='center', va='center', color='#111827') fig.colorbar(image, ax=ax, fraction=0.046, pad=0.04) fig.tight_layout() fig.savefig(OUTPUT_DIR / '04_LSTM风险等级混淆矩阵.png', dpi=220, bbox_inches='tight') plt.close(fig) def plot_feature_importance(): candidate_files = [ ('xgboost', MODELS_DIR / 'xgboost_model.pkl'), ('random_forest', MODELS_DIR / 'random_forest_model.pkl'), ('extra_trees', MODELS_DIR / 'extra_trees_model.pkl'), ] selected_features_path = MODELS_DIR / 'selected_features.pkl' feature_names_path = MODELS_DIR / 'feature_names.pkl' selected_features = joblib.load(selected_features_path) if selected_features_path.exists() else None feature_names = joblib.load(feature_names_path) if feature_names_path.exists() else None for model_name, model_path in candidate_files: if not model_path.exists(): continue model = joblib.load(model_path) if not hasattr(model, 'feature_importances_'): continue importances = model.feature_importances_ names = selected_features or feature_names or [f'feature_{idx}' for idx in range(len(importances))] if len(names) != len(importances): names = [f'feature_{idx}' for idx in range(len(importances))] top_items = sorted(zip(names, importances), key=lambda item: item[1], reverse=True)[:15] top_items.reverse() fig, ax = plt.subplots(figsize=(8, 6)) ax.barh( [config.FEATURE_NAME_CN.get(name, name) for name, _ in top_items], [float(value) for _, value in top_items], color='#0f766e', ) ax.set_title(f'{MODEL_DISPLAY_NAMES.get(model_name, model_name)}特征重要性 Top15') ax.set_xlabel('重要性') fig.tight_layout() fig.savefig(OUTPUT_DIR / '05_特征重要性_Top15.png', dpi=220, bbox_inches='tight') plt.close(fig) return model_name return None def save_summary(metrics, prediction_df, feature_model_name): residuals = prediction_df['残差'].to_numpy() summary = { 'best_model': next(iter(metrics.keys())), 'metrics': metrics, 'lstm_prediction_summary': { 'prediction_count': int(len(prediction_df)), 'residual_mean': round(float(residuals.mean()), 4), 'residual_std': round(float(residuals.std()), 4), 'risk_accuracy': round( float((prediction_df['真实风险等级'] == prediction_df['预测风险等级']).mean()), 4, ), }, 'feature_importance_model': feature_model_name, 'generated_files': sorted([file.name for file in OUTPUT_DIR.iterdir() if file.is_file()]), } SUMMARY_JSON.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8') def main(): ensure_output_dir() metrics = load_metrics() prediction_df = load_lstm_predictions() plot_model_comparison(metrics) plot_actual_vs_pred(prediction_df) plot_residuals(prediction_df) plot_confusion_matrix(prediction_df) feature_model_name = plot_feature_importance() save_summary(metrics, prediction_df, feature_model_name) print(f'评估图片已生成: {OUTPUT_DIR}') print(f'LSTM预测明细: {PREDICTION_CSV}') print(f'评估摘要: {SUMMARY_JSON}') if __name__ == '__main__': main()