Files
forsetsystem/backend/core/generate_evaluation_plots.py
2026-03-20 17:03:27 +08:00

334 lines
12 KiB
Python

import json
import os
import sys
from pathlib import Path
import joblib
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import config
from core.deep_learning_model import (
_build_sequence_arrays,
load_lstm_mlp_bundle,
)
from core.model_features import (
NUMERICAL_OUTLIER_COLUMNS,
TARGET_COLUMN,
apply_outlier_bounds,
engineer_features,
fit_outlier_bounds,
make_target_bins,
normalize_columns,
prepare_modeling_dataframe,
)
from core.preprocessing import get_clean_data
matplotlib.rcParams['font.sans-serif'] = [
'Microsoft YaHei',
'SimHei',
'Noto Sans CJK SC',
'Arial Unicode MS',
'DejaVu Sans',
]
matplotlib.rcParams['axes.unicode_minus'] = False
BASE_DIR = Path(config.BASE_DIR)
MODELS_DIR = Path(config.MODELS_DIR)
OUTPUT_DIR = BASE_DIR / 'outputs' / 'eval_figures'
PREDICTION_CSV = OUTPUT_DIR / 'lstm_predictions.csv'
SUMMARY_JSON = OUTPUT_DIR / 'evaluation_summary.json'
MODEL_DISPLAY_NAMES = {
'lstm_mlp': '时序注意力融合网络',
'xgboost': 'XGBoost',
'gradient_boosting': 'GBDT',
'random_forest': '随机森林',
'extra_trees': '极端随机树',
'lightgbm': 'LightGBM',
}
def ensure_output_dir():
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
def load_metrics():
metrics_path = MODELS_DIR / 'model_metrics.pkl'
if not metrics_path.exists():
raise FileNotFoundError(f'未找到模型评估文件: {metrics_path}')
metrics = joblib.load(metrics_path)
return dict(sorted(metrics.items(), key=lambda item: item[1].get('r2', -999), reverse=True))
def get_test_split():
raw_df = normalize_columns(get_clean_data())
target_bins = make_target_bins(raw_df[TARGET_COLUMN].values)
raw_train_df, raw_test_df = train_test_split(
raw_df,
test_size=config.TEST_SIZE,
random_state=config.RANDOM_STATE,
stratify=target_bins,
)
return raw_train_df.reset_index(drop=True), raw_test_df.reset_index(drop=True)
def classify_risk(values):
values = np.asarray(values, dtype=float)
return np.where(values < 4, '低风险', np.where(values <= 8, '中风险', '高风险'))
def load_lstm_predictions():
model_path = MODELS_DIR / 'lstm_mlp_model.pt'
if not model_path.exists():
raise FileNotFoundError(f'未找到深度学习模型文件: {model_path}')
bundle = load_lstm_mlp_bundle(str(model_path))
if bundle is None:
raise RuntimeError('无法加载深度学习模型,请确认 torch 环境和模型文件正常。')
raw_train_df, raw_test_df = get_test_split()
fit_df = prepare_modeling_dataframe(raw_train_df)
test_df = prepare_modeling_dataframe(raw_test_df)
outlier_bounds = fit_outlier_bounds(fit_df, NUMERICAL_OUTLIER_COLUMNS)
fit_df = apply_outlier_bounds(fit_df, outlier_bounds)
test_df = apply_outlier_bounds(test_df, outlier_bounds)
feature_layout = bundle['feature_layout']
category_maps = bundle['category_maps']
target_transform = bundle['target_transform']
_, _, _, _, _ = _build_sequence_arrays(
fit_df,
feature_layout,
category_maps,
target_transform,
)
test_seq_num, test_seq_cat, test_static_num, test_static_cat, y_test = _build_sequence_arrays(
test_df,
feature_layout,
category_maps,
target_transform,
)
test_seq_num = ((test_seq_num - bundle['seq_mean']) / bundle['seq_std']).astype(np.float32)
test_static_num = ((test_static_num - bundle['static_mean']) / bundle['static_std']).astype(np.float32)
import torch
model = bundle['model']
model.eval()
with torch.no_grad():
predictions = model(
torch.tensor(test_seq_num, dtype=torch.float32),
torch.tensor(test_seq_cat, dtype=torch.long),
torch.tensor(test_static_num, dtype=torch.float32),
torch.tensor(test_static_cat, dtype=torch.long),
).cpu().numpy()
if target_transform == 'log1p':
y_true = np.expm1(y_test)
y_pred = np.expm1(predictions)
else:
y_true = y_test
y_pred = predictions
y_true = np.asarray(y_true, dtype=float)
y_pred = np.clip(np.asarray(y_pred, dtype=float), a_min=0.0, a_max=None)
residuals = y_pred - y_true
prediction_df = pd.DataFrame({
'真实值': np.round(y_true, 4),
'预测值': np.round(y_pred, 4),
'残差': np.round(residuals, 4),
'真实风险等级': classify_risk(y_true),
'预测风险等级': classify_risk(y_pred),
})
prediction_df.to_csv(PREDICTION_CSV, index=False, encoding='utf-8-sig')
return prediction_df
def plot_model_comparison(metrics):
model_names = [MODEL_DISPLAY_NAMES.get(name, name) for name in metrics.keys()]
r2_values = [metrics[name]['r2'] for name in metrics]
rmse_values = [metrics[name]['rmse'] for name in metrics]
mae_values = [metrics[name]['mae'] for name in metrics]
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
bar_colors = ['#0f766e' if name == 'lstm_mlp' else '#94a3b8' for name in metrics.keys()]
axes[0].bar(model_names, r2_values, color=bar_colors)
axes[0].set_title('模型R2对比')
axes[0].set_ylabel('R2')
axes[0].tick_params(axis='x', rotation=20)
axes[1].bar(model_names, rmse_values, color=bar_colors)
axes[1].set_title('模型RMSE对比')
axes[1].set_ylabel('RMSE')
axes[1].tick_params(axis='x', rotation=20)
axes[2].bar(model_names, mae_values, color=bar_colors)
axes[2].set_title('模型MAE对比')
axes[2].set_ylabel('MAE')
axes[2].tick_params(axis='x', rotation=20)
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '01_模型性能对比.png', dpi=220, bbox_inches='tight')
plt.close(fig)
def plot_actual_vs_pred(prediction_df):
y_true = prediction_df['真实值'].to_numpy()
y_pred = prediction_df['预测值'].to_numpy()
max_value = max(float(y_true.max()), float(y_pred.max()))
fig, ax = plt.subplots(figsize=(7, 7))
ax.scatter(y_true, y_pred, s=18, alpha=0.55, color='#0f766e', edgecolors='none')
ax.plot([0, max_value], [0, max_value], color='#dc2626', linestyle='--', linewidth=1.5)
ax.set_title('LSTM模型真实值与预测值对比')
ax.set_xlabel('真实缺勤时长(小时)')
ax.set_ylabel('预测缺勤时长(小时)')
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '02_LSTM真实值_vs_预测值.png', dpi=220, bbox_inches='tight')
plt.close(fig)
def plot_residuals(prediction_df):
y_pred = prediction_df['预测值'].to_numpy()
residuals = prediction_df['残差'].to_numpy()
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
axes[0].hist(residuals, bins=30, color='#2563eb', alpha=0.85, edgecolor='white')
axes[0].axvline(0, color='#dc2626', linestyle='--', linewidth=1.2)
axes[0].set_title('LSTM残差分布')
axes[0].set_xlabel('残差(预测值 - 真实值)')
axes[0].set_ylabel('样本数')
axes[1].scatter(y_pred, residuals, s=18, alpha=0.55, color='#7c3aed', edgecolors='none')
axes[1].axhline(0, color='#dc2626', linestyle='--', linewidth=1.2)
axes[1].set_title('LSTM残差散点图')
axes[1].set_xlabel('预测缺勤时长(小时)')
axes[1].set_ylabel('残差')
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '03_LSTM残差分析.png', dpi=220, bbox_inches='tight')
plt.close(fig)
def plot_confusion_matrix(prediction_df):
labels = ['低风险', '中风险', '高风险']
cm = confusion_matrix(
prediction_df['真实风险等级'],
prediction_df['预测风险等级'],
labels=labels,
)
fig, ax = plt.subplots(figsize=(6, 5))
image = ax.imshow(cm, cmap='GnBu')
ax.set_title('LSTM风险等级混淆矩阵')
ax.set_xlabel('预测类别')
ax.set_ylabel('真实类别')
ax.set_xticks(range(len(labels)))
ax.set_xticklabels(labels)
ax.set_yticks(range(len(labels)))
ax.set_yticklabels(labels)
for row in range(cm.shape[0]):
for col in range(cm.shape[1]):
ax.text(col, row, int(cm[row, col]), ha='center', va='center', color='#111827')
fig.colorbar(image, ax=ax, fraction=0.046, pad=0.04)
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '04_LSTM风险等级混淆矩阵.png', dpi=220, bbox_inches='tight')
plt.close(fig)
def plot_feature_importance():
candidate_files = [
('xgboost', MODELS_DIR / 'xgboost_model.pkl'),
('random_forest', MODELS_DIR / 'random_forest_model.pkl'),
('extra_trees', MODELS_DIR / 'extra_trees_model.pkl'),
]
selected_features_path = MODELS_DIR / 'selected_features.pkl'
feature_names_path = MODELS_DIR / 'feature_names.pkl'
selected_features = joblib.load(selected_features_path) if selected_features_path.exists() else None
feature_names = joblib.load(feature_names_path) if feature_names_path.exists() else None
for model_name, model_path in candidate_files:
if not model_path.exists():
continue
model = joblib.load(model_path)
if not hasattr(model, 'feature_importances_'):
continue
importances = model.feature_importances_
names = selected_features or feature_names or [f'feature_{idx}' for idx in range(len(importances))]
if len(names) != len(importances):
names = [f'feature_{idx}' for idx in range(len(importances))]
top_items = sorted(zip(names, importances), key=lambda item: item[1], reverse=True)[:15]
top_items.reverse()
fig, ax = plt.subplots(figsize=(8, 6))
ax.barh(
[config.FEATURE_NAME_CN.get(name, name) for name, _ in top_items],
[float(value) for _, value in top_items],
color='#0f766e',
)
ax.set_title(f'{MODEL_DISPLAY_NAMES.get(model_name, model_name)}特征重要性 Top15')
ax.set_xlabel('重要性')
fig.tight_layout()
fig.savefig(OUTPUT_DIR / '05_特征重要性_Top15.png', dpi=220, bbox_inches='tight')
plt.close(fig)
return model_name
return None
def save_summary(metrics, prediction_df, feature_model_name):
residuals = prediction_df['残差'].to_numpy()
summary = {
'best_model': next(iter(metrics.keys())),
'metrics': metrics,
'lstm_prediction_summary': {
'prediction_count': int(len(prediction_df)),
'residual_mean': round(float(residuals.mean()), 4),
'residual_std': round(float(residuals.std()), 4),
'risk_accuracy': round(
float((prediction_df['真实风险等级'] == prediction_df['预测风险等级']).mean()),
4,
),
},
'feature_importance_model': feature_model_name,
'generated_files': sorted([file.name for file in OUTPUT_DIR.iterdir() if file.is_file()]),
}
SUMMARY_JSON.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
def main():
ensure_output_dir()
metrics = load_metrics()
prediction_df = load_lstm_predictions()
plot_model_comparison(metrics)
plot_actual_vs_pred(prediction_df)
plot_residuals(prediction_df)
plot_confusion_matrix(prediction_df)
feature_model_name = plot_feature_importance()
save_summary(metrics, prediction_df, feature_model_name)
print(f'评估图片已生成: {OUTPUT_DIR}')
print(f'LSTM预测明细: {PREDICTION_CSV}')
print(f'评估摘要: {SUMMARY_JSON}')
if __name__ == '__main__':
main()