Files
forsetsystem/backend/core/shap_analysis.py

407 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import joblib
import numpy as np
import pandas as pd
import config
try:
import shap
SHAP_AVAILABLE = True
except ImportError:
SHAP_AVAILABLE = False
class SHAPAnalyzer:
"""基于 SHAP 值的可解释性分析器,按 JD-R 维度聚合解释结果。"""
def __init__(self):
self.explainers = {}
self.models = {}
self.scaler = None
self.feature_names = None
self.selected_features = None
self.label_encoders = {}
self.background_data = {}
self.global_result_cache = {}
self.global_matrix_cache = {}
self._initialized = False
def _ensure_initialized(self):
if self._initialized:
return
# 加载回归模型SHAP 分析基于回归模型)
models_dir = config.MODELS_DIR
model_files = {
'random_forest': 'random_forest_model.pkl',
'xgboost': 'xgboost_model.pkl',
'lightgbm': 'lightgbm_model.pkl',
'gradient_boosting': 'gradient_boosting_model.pkl',
'extra_trees': 'extra_trees_model.pkl',
}
for name, filename in model_files.items():
path = os.path.join(models_dir, filename)
if os.path.exists(path):
try:
self.models[name] = joblib.load(path)
except Exception:
pass
# 加载预处理工件
if os.path.exists(config.SCALER_PATH):
self.scaler = joblib.load(config.SCALER_PATH)
for filename, attr in [
('feature_names.pkl', 'feature_names'),
('selected_features.pkl', 'selected_features'),
('label_encoders.pkl', 'label_encoders'),
]:
path = os.path.join(models_dir, filename)
if os.path.exists(path):
try:
setattr(self, attr, joblib.load(path))
except Exception:
pass
self._initialized = True
def _get_tree_explainer(self, model_type='random_forest'):
"""获取或创建 TreeExplainer"""
if not SHAP_AVAILABLE:
return None
if model_type in self.explainers:
return self.explainers[model_type]
model = self.models.get(model_type)
if model is None:
return None
try:
explainer = shap.TreeExplainer(model)
self.explainers[model_type] = explainer
return explainer
except Exception:
return None
def _get_background_sample(self, n_samples=500):
"""获取背景数据样本"""
if n_samples in self.background_data:
return self.background_data[n_samples]
try:
from core.preprocessing import get_clean_data
from core.model_features import (
normalize_columns, prepare_modeling_dataframe,
apply_outlier_bounds, fit_outlier_bounds,
engineer_features, extract_xy, fit_label_encoders,
apply_label_encoders, align_feature_frame, to_float_array,
NUMERICAL_OUTLIER_COLUMNS, ORDINAL_COLUMNS,
)
raw_df = normalize_columns(get_clean_data())
df = prepare_modeling_dataframe(raw_df)
bounds = fit_outlier_bounds(df, NUMERICAL_OUTLIER_COLUMNS)
df = apply_outlier_bounds(df, bounds)
df = engineer_features(df)
X_df, _ = extract_xy(df)
X_df, encoders = fit_label_encoders(X_df, ORDINAL_COLUMNS)
if self.feature_names:
X_df = align_feature_frame(X_df, self.feature_names)
if n_samples < len(X_df):
X_df = X_df.sample(n=n_samples, random_state=config.RANDOM_STATE)
if self.scaler is not None:
X = self.scaler.transform(to_float_array(X_df))
else:
X = to_float_array(X_df)
if self.selected_features and self.feature_names:
selected_indices = [self.feature_names.index(n) for n in self.selected_features if n in self.feature_names]
if selected_indices:
X = X[:, selected_indices]
self.background_data[n_samples] = X
return X
except Exception:
return None
def _get_feature_display_names(self):
"""获取特征显示名称映射"""
feature_names = self.selected_features or self.feature_names or []
return {name: config.FEATURE_NAME_CN.get(name, name) for name in feature_names}
def _map_feature_to_dimension(self, feature_name):
"""将特征映射到 JD-R 维度"""
for dim_key, dim_info in config.JDR_DIMENSIONS.items():
if feature_name in dim_info['features']:
return dim_key
# 事件/上下文特征
context_features = ['缺勤月份', '星期几', '是否节假日前后', '季节',
'请假类型', '请假原因大类', '是否提供医院证明',
'是否临时请假', '是否连续缺勤', '前一工作日是否加班']
if feature_name in context_features:
return 'event_context'
return 'other'
def global_shap_values(self, model_type='random_forest'):
"""计算全局 SHAP 重要性,按 JD-R 维度分组"""
if not SHAP_AVAILABLE:
return {'error': 'SHAP library not installed'}
if model_type in self.global_result_cache:
return self.global_result_cache[model_type]
self._ensure_initialized()
explainer = self._get_tree_explainer(model_type)
if explainer is None:
return {'error': f'No tree model available for {model_type}'}
X = self._get_background_sample(n_samples=32)
if X is None:
return {'error': 'Failed to prepare background data'}
try:
shap_values = explainer.shap_values(X)
if isinstance(shap_values, list):
shap_values = shap_values[0]
self.global_matrix_cache[model_type] = {
'X': X,
'shap_values': shap_values,
}
mean_abs_shap = np.abs(shap_values).mean(axis=0)
feature_names = self.selected_features or self.feature_names or []
name_map = self._get_feature_display_names()
# 按维度分组
dimensions = {}
for dim_key, dim_info in config.JDR_DIMENSIONS.items():
dim_features = []
for fname in feature_names:
if fname in dim_info['features']:
idx = list(feature_names).index(fname)
dim_features.append({
'name': fname,
'name_cn': name_map.get(fname, fname),
'importance': round(float(mean_abs_shap[idx]), 4),
})
if dim_features:
dimensions[dim_key] = {
'name_cn': dim_info['name_cn'],
'features': sorted(dim_features, key=lambda x: x['importance'], reverse=True),
}
# 事件上下文维度
context_features = []
for fname in feature_names:
if self._map_feature_to_dimension(fname) == 'event_context':
idx = list(feature_names).index(fname)
context_features.append({
'name': fname,
'name_cn': name_map.get(fname, fname),
'importance': round(float(mean_abs_shap[idx]), 4),
})
if context_features:
dimensions['event_context'] = {
'name_cn': '事件上下文',
'features': sorted(context_features, key=lambda x: x['importance'], reverse=True),
}
# Top 特征列表
top_indices = np.argsort(mean_abs_shap)[::-1][:20]
top_features = []
for idx in top_indices:
fname = feature_names[idx] if idx < len(feature_names) else f'f{idx}'
top_features.append({
'name': fname,
'name_cn': name_map.get(fname, fname),
'importance': round(float(mean_abs_shap[idx]), 4),
'dimension': self._map_feature_to_dimension(fname),
})
result = {
'model_type': model_type,
'dimensions': dimensions,
'top_features': top_features,
}
self.global_result_cache[model_type] = result
return result
except Exception as exc:
return {'error': str(exc)}
def local_shap_values(self, data, model_type='random_forest'):
"""计算单条预测的 SHAP 解释"""
if not SHAP_AVAILABLE:
return {'error': 'SHAP library not installed'}
self._ensure_initialized()
explainer = self._get_tree_explainer(model_type)
if explainer is None:
return {'error': f'No tree model available for {model_type}'}
try:
from core.model_features import (
build_prediction_dataframe, engineer_features,
apply_label_encoders, align_feature_frame, to_float_array,
)
X_df = build_prediction_dataframe(data)
X_df = engineer_features(X_df)
X_df = apply_label_encoders(X_df, self.label_encoders)
if self.feature_names:
X_df = align_feature_frame(X_df, self.feature_names)
features = self.scaler.transform(to_float_array(X_df))
if self.selected_features and self.feature_names:
selected_indices = [self.feature_names.index(n) for n in self.selected_features if n in self.feature_names]
if selected_indices:
features = features[:, selected_indices]
shap_values = explainer.shap_values(features)
if isinstance(shap_values, list):
shap_values = shap_values[0]
base_value = float(explainer.expected_value)
if isinstance(base_value, (list, np.ndarray)):
base_value = float(base_value[0])
feature_names = self.selected_features or self.feature_names or []
name_map = self._get_feature_display_names()
feature_contributions = []
dimension_contribution = {}
for idx, fname in enumerate(feature_names):
sv = float(shap_values[0][idx])
fv = float(features[0][idx])
dim = self._map_feature_to_dimension(fname)
feature_contributions.append({
'name': fname,
'name_cn': name_map.get(fname, fname),
'shap_value': round(sv, 4),
'feature_value': round(fv, 4),
'dimension': dim,
})
dimension_contribution[dim] = dimension_contribution.get(dim, 0) + sv
feature_contributions.sort(key=lambda x: abs(x['shap_value']), reverse=True)
# 维度标签
dim_labels = {}
for dk, di in config.JDR_DIMENSIONS.items():
dim_labels[dk] = di['name_cn']
dim_labels['event_context'] = '事件上下文'
dim_labels['other'] = '其他'
return {
'base_value': round(base_value, 4),
'features': feature_contributions[:20],
'dimension_contribution': {
dim_labels.get(k, k): round(v, 4)
for k, v in sorted(dimension_contribution.items(), key=lambda x: abs(x[1]), reverse=True)
},
}
except Exception as exc:
return {'error': str(exc)}
def shap_interaction(self, model_type='random_forest', top_n=10):
"""计算近似 SHAP 交互强度"""
if not SHAP_AVAILABLE:
return {'error': 'SHAP library not installed'}
self._ensure_initialized()
if model_type not in self.global_matrix_cache:
result = self.global_shap_values(model_type)
if result.get('error'):
return result
try:
cached = self.global_matrix_cache.get(model_type)
if not cached:
return {'error': 'Failed to prepare SHAP cache'}
shap_values = np.asarray(cached['shap_values'])
feature_names = self.selected_features or self.feature_names or []
if shap_values.ndim != 2 or shap_values.shape[0] < 2:
return {'error': 'Not enough SHAP samples for interaction analysis'}
corr_matrix = np.nan_to_num(np.corrcoef(shap_values, rowvar=False))
strength_matrix = np.abs(corr_matrix)
np.fill_diagonal(strength_matrix, 0)
flat_idx = np.argsort(strength_matrix.ravel())[::-1][:top_n * 2]
top_pairs = []
seen = set()
for idx in flat_idx:
i, j = divmod(idx, strength_matrix.shape[1])
if i >= j:
continue
pair_key = (min(i, j), max(i, j))
if pair_key in seen:
continue
seen.add(pair_key)
fi = feature_names[i] if i < len(feature_names) else f'f{i}'
fj = feature_names[j] if j < len(feature_names) else f'f{j}'
name_map = self._get_feature_display_names()
top_pairs.append({
'feature_1': fi,
'feature_1_cn': name_map.get(fi, fi),
'feature_2': fj,
'feature_2_cn': name_map.get(fj, fj),
'strength': round(float(strength_matrix[i, j]), 4),
})
if len(top_pairs) >= top_n:
break
return {
'model_type': model_type,
'top_interactions': top_pairs,
}
except Exception as exc:
return {'error': str(exc)}
def shap_dependence(self, feature_name, model_type='random_forest'):
"""计算单个特征的 SHAP 依赖图数据"""
if not SHAP_AVAILABLE:
return {'error': 'SHAP library not installed'}
self._ensure_initialized()
if model_type not in self.global_matrix_cache:
result = self.global_shap_values(model_type)
if result.get('error'):
return result
try:
cached = self.global_matrix_cache.get(model_type)
if not cached:
return {'error': 'Failed to prepare SHAP cache'}
feature_names = self.selected_features or self.feature_names or []
if feature_name not in feature_names:
return {'error': f'Feature {feature_name} not found'}
col_idx = list(feature_names).index(feature_name)
X = np.asarray(cached['X'])
shap_values = np.asarray(cached['shap_values'])
feature_vals = X[:, col_idx].tolist()
shap_vals = shap_values[:, col_idx].tolist()
# 下采样用于可视化
max_points = 300
if len(feature_vals) > max_points:
indices = np.random.RandomState(config.RANDOM_STATE).choice(
len(feature_vals), max_points, replace=False
)
feature_vals = [feature_vals[i] for i in indices]
shap_vals = [shap_vals[i] for i in indices]
name_map = self._get_feature_display_names()
return {
'feature': feature_name,
'feature_cn': name_map.get(feature_name, feature_name),
'values': [round(v, 4) for v in feature_vals],
'shap_values': [round(v, 4) for v in shap_vals],
}
except Exception as exc:
return {'error': str(exc)}