fix:generate_evaluation_plots

This commit is contained in:
2026-03-20 17:01:30 +08:00
parent cc85e3807a
commit 1e1d4b0d17

View File

@@ -0,0 +1,330 @@
import json
import os
from pathlib import Path
import joblib
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
import config
from core.deep_learning_model import (
_build_sequence_arrays,
load_lstm_mlp_bundle,
)
from core.model_features import (
NUMERICAL_OUTLIER_COLUMNS,
TARGET_COLUMN,
apply_outlier_bounds,
engineer_features,
fit_outlier_bounds,
make_target_bins,
normalize_columns,
prepare_modeling_dataframe,
)
from core.preprocessing import get_clean_data
matplotlib.rcParams['font.sans-serif'] = [
'Microsoft YaHei',
'SimHei',
'Noto Sans CJK SC',
'Arial Unicode MS',
'DejaVu Sans',
]
matplotlib.rcParams['axes.unicode_minus'] = False
BASE_DIR = Path(config.BASE_DIR)
MODELS_DIR = Path(config.MODELS_DIR)
OUTPUT_DIR = BASE_DIR / 'outputs' / 'eval_figures'
PREDICTION_CSV = OUTPUT_DIR / 'lstm_predictions.csv'
SUMMARY_JSON = OUTPUT_DIR / 'evaluation_summary.json'
MODEL_DISPLAY_NAMES = {
'lstm_mlp': '时序注意力融合网络',
'xgboost': 'XGBoost',
'gradient_boosting': 'GBDT',
'random_forest': '随机森林',
'extra_trees': '极端随机树',
'lightgbm': 'LightGBM',
}
def ensure_output_dir():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
def load_metrics():
metrics_path = MODELS_DIR / 'model_metrics.pkl'
if not metrics_path.exists():
raise FileNotFoundError(f'未找到模型评估文件: {metrics_path}')
metrics = joblib.load(metrics_path)
return dict(sorted(metrics.items(), key=lambda item: item[1].get('r2', -999), reverse=True))
def get_test_split():
raw_df = normalize_columns(get_clean_data())
target_bins = make_target_bins(raw_df[TARGET_COLUMN].values)
raw_train_df, raw_test_df = train_test_split(
raw_df,
test_size=config.TEST_SIZE,
random_state=config.RANDOM_STATE,
stratify=target_bins,
)
return raw_train_df.reset_index(drop=True), raw_test_df.reset_index(drop=True)
def classify_risk(values):
values = np.asarray(values, dtype=float)
return np.where(values < 4, '低风险', np.where(values <= 8, '中风险', '高风险'))
def load_lstm_predictions():
model_path = MODELS_DIR / 'lstm_mlp_model.pt'
if not model_path.exists():
raise FileNotFoundError(f'未找到深度学习模型文件: {model_path}')
bundle = load_lstm_mlp_bundle(str(model_path))
if bundle is None:
raise RuntimeError('无法加载深度学习模型,请确认 torch 环境和模型文件正常。')
raw_train_df, raw_test_df = get_test_split()
fit_df = prepare_modeling_dataframe(raw_train_df)
test_df = prepare_modeling_dataframe(raw_test_df)
outlier_bounds = fit_outlier_bounds(fit_df, NUMERICAL_OUTLIER_COLUMNS)
fit_df = apply_outlier_bounds(fit_df, outlier_bounds)
test_df = apply_outlier_bounds(test_df, outlier_bounds)
feature_layout = bundle['feature_layout']
category_maps = bundle['category_maps']
target_transform = bundle['target_transform']
_, _, _, _, _ = _build_sequence_arrays(
fit_df,
feature_layout,
category_maps,
target_transform,
)
test_seq_num, test_seq_cat, test_static_num, test_static_cat, y_test = _build_sequence_arrays(
test_df,
feature_layout,
category_maps,
target_transform,
)
test_seq_num = ((test_seq_num - bundle['seq_mean']) / bundle['seq_std']).astype(np.float32)
test_static_num = ((test_static_num - bundle['static_mean']) / bundle['static_std']).astype(np.float32)
import torch
model = bundle['model']
model.eval()
with torch.no_grad():
predictions = model(
torch.tensor(test_seq_num, dtype=torch.float32),
torch.tensor(test_seq_cat, dtype=torch.long),
torch.tensor(test_static_num, dtype=torch.float32),
torch.tensor(test_static_cat, dtype=torch.long),
).cpu().numpy()
if target_transform == 'log1p':
y_true = np.expm1(y_test)
y_pred = np.expm1(predictions)
else:
y_true = y_test
y_pred = predictions
y_true = np.asarray(y_true, dtype=float)
y_pred = np.clip(np.asarray(y_pred, dtype=float), a_min=0.0, a_max=None)
residuals = y_pred - y_true
prediction_df = pd.DataFrame({
'真实值': np.round(y_true, 4),
'预测值': np.round(y_pred, 4),
'残差': np.round(residuals, 4),
'真实风险等级': classify_risk(y_true),
'预测风险等级': classify_risk(y_pred),
})
prediction_df.to_csv(PREDICTION_CSV, index=False, encoding='utf-8-sig')
return prediction_df
def plot_model_comparison(metrics):
model_names = [MODEL_DISPLAY_NAMES.get(name, name) for name in metrics.keys()]
r2_values = [metrics[name]['r2'] for name in metrics]
rmse_values = [metrics[name]['rmse'] for name in metrics]
mae_values = [metrics[name]['mae'] for name in metrics]
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
bar_colors = ['#0f766e' if name == 'lstm_mlp' else '#94a3b8' for name in metrics.keys()]
axes[0].bar(model_names, r2_values, color=bar_colors)
axes[0].set_title('模型R2对比')
axes[0].set_ylabel('R2')
axes[0].tick_params(axis='x', rotation=20)
axes[1].bar(model_names, rmse_values, color=bar_colors)
axes[1].set_title('模型RMSE对比')
axes[1].set_ylabel('RMSE')
axes[1].tick_params(axis='x', rotation=20)
axes[2].bar(model_names, mae_values, color=bar_colors)
axes[2].set_title('模型MAE对比')
axes[2].set_ylabel('MAE')
axes[2].tick_params(axis='x', rotation=20)
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '01_模型性能对比.png', dpi=220, bbox_inches='tight')
plt.close(fig)
def plot_actual_vs_pred(prediction_df):
y_true = prediction_df['真实值'].to_numpy()
y_pred = prediction_df['预测值'].to_numpy()
max_value = max(float(y_true.max()), float(y_pred.max()))
fig, ax = plt.subplots(figsize=(7, 7))
ax.scatter(y_true, y_pred, s=18, alpha=0.55, color='#0f766e', edgecolors='none')
ax.plot([0, max_value], [0, max_value], color='#dc2626', linestyle='--', linewidth=1.5)
ax.set_title('LSTM模型真实值与预测值对比')
ax.set_xlabel('真实缺勤时长(小时)')
ax.set_ylabel('预测缺勤时长(小时)')
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '02_LSTM真实值_vs_预测值.png', dpi=220, bbox_inches='tight')
plt.close(fig)
def plot_residuals(prediction_df):
y_pred = prediction_df['预测值'].to_numpy()
residuals = prediction_df['残差'].to_numpy()
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].hist(residuals, bins=30, color='#2563eb', alpha=0.85, edgecolor='white')
axes[0].axvline(0, color='#dc2626', linestyle='--', linewidth=1.2)
axes[0].set_title('LSTM残差分布')
axes[0].set_xlabel('残差(预测值 - 真实值)')
axes[0].set_ylabel('样本数')
axes[1].scatter(y_pred, residuals, s=18, alpha=0.55, color='#7c3aed', edgecolors='none')
axes[1].axhline(0, color='#dc2626', linestyle='--', linewidth=1.2)
axes[1].set_title('LSTM残差散点图')
axes[1].set_xlabel('预测缺勤时长(小时)')
axes[1].set_ylabel('残差')
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '03_LSTM残差分析.png', dpi=220, bbox_inches='tight')
plt.close(fig)
def plot_confusion_matrix(prediction_df):
labels = ['低风险', '中风险', '高风险']
cm = confusion_matrix(
prediction_df['真实风险等级'],
prediction_df['预测风险等级'],
labels=labels,
)
fig, ax = plt.subplots(figsize=(6, 5))
image = ax.imshow(cm, cmap='GnBu')
ax.set_title('LSTM风险等级混淆矩阵')
ax.set_xlabel('预测类别')
ax.set_ylabel('真实类别')
ax.set_xticks(range(len(labels)))
ax.set_xticklabels(labels)
ax.set_yticks(range(len(labels)))
ax.set_yticklabels(labels)
for row in range(cm.shape[0]):
for col in range(cm.shape[1]):
ax.text(col, row, int(cm[row, col]), ha='center', va='center', color='#111827')
fig.colorbar(image, ax=ax, fraction=0.046, pad=0.04)
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '04_LSTM风险等级混淆矩阵.png', dpi=220, bbox_inches='tight')
plt.close(fig)
def plot_feature_importance():
candidate_files = [
('xgboost', MODELS_DIR / 'xgboost_model.pkl'),
('random_forest', MODELS_DIR / 'random_forest_model.pkl'),
('extra_trees', MODELS_DIR / 'extra_trees_model.pkl'),
]
selected_features_path = MODELS_DIR / 'selected_features.pkl'
feature_names_path = MODELS_DIR / 'feature_names.pkl'
selected_features = joblib.load(selected_features_path) if selected_features_path.exists() else None
feature_names = joblib.load(feature_names_path) if feature_names_path.exists() else None
for model_name, model_path in candidate_files:
if not model_path.exists():
continue
model = joblib.load(model_path)
if not hasattr(model, 'feature_importances_'):
continue
importances = model.feature_importances_
names = selected_features or feature_names or [f'feature_{idx}' for idx in range(len(importances))]
if len(names) != len(importances):
names = [f'feature_{idx}' for idx in range(len(importances))]
top_items = sorted(zip(names, importances), key=lambda item: item[1], reverse=True)[:15]
top_items.reverse()
fig, ax = plt.subplots(figsize=(8, 6))
ax.barh(
[config.FEATURE_NAME_CN.get(name, name) for name, _ in top_items],
[float(value) for _, value in top_items],
color='#0f766e',
)
ax.set_title(f'{MODEL_DISPLAY_NAMES.get(model_name, model_name)}特征重要性 Top15')
ax.set_xlabel('重要性')
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '05_特征重要性_Top15.png', dpi=220, bbox_inches='tight')
plt.close(fig)
return model_name
return None
def save_summary(metrics, prediction_df, feature_model_name):
residuals = prediction_df['残差'].to_numpy()
summary = {
'best_model': next(iter(metrics.keys())),
'metrics': metrics,
'lstm_prediction_summary': {
'prediction_count': int(len(prediction_df)),
'residual_mean': round(float(residuals.mean()), 4),
'residual_std': round(float(residuals.std()), 4),
'risk_accuracy': round(
float((prediction_df['真实风险等级'] == prediction_df['预测风险等级']).mean()),
4,
),
},
'feature_importance_model': feature_model_name,
'generated_files': sorted([file.name for file in OUTPUT_DIR.iterdir() if file.is_file()]),
}
SUMMARY_JSON.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
def main():
ensure_output_dir()
metrics = load_metrics()
prediction_df = load_lstm_predictions()
plot_model_comparison(metrics)
plot_actual_vs_pred(prediction_df)
plot_residuals(prediction_df)
plot_confusion_matrix(prediction_df)
feature_model_name = plot_feature_importance()
save_summary(metrics, prediction_df, feature_model_name)
print(f'评估图片已生成: {OUTPUT_DIR}')
print(f'LSTM预测明细: {PREDICTION_CSV}')
print(f'评估摘要: {SUMMARY_JSON}')
if __name__ == '__main__':
main()