Files
forsetsystem/backend/services/jdr_service.py
shuo e8235bf3ca feat: 添加 JD-R 理论分析模块与 SHAP 可解释性分析功能
- 后端新增 JD-R(工作要求-资源)理论维度数据生成,包含工作要求、工作资源、
    个人资源、中介变量共 16 个新特征列
  - 新增 JD-R 分析服务与 API(维度统计、倦怠投入分析、双路径中介分析、
    分组轮廓、风险分布)
  - 新增 SHAP 可解释性分析模块(全局重要性、局部解释、特征交互、依赖图)
  - 预测服务增加风险分类模型加载与概率预测能力
  - 前端新增 JD-R 分析页面(JDRAnalysis.vue),含雷达图、散点图、路径分析等可视化
  - 预测页面增加风险概率展示与 SHAP 特征解释
  - 路由与导航菜单同步更新
2026-04-04 07:15:46 +08:00

213 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import pandas as pd
import config
from core.model_features import engineer_features
from core.preprocessing import get_clean_data
class JDRService:
"""JD-R工作要求-资源)理论分析服务"""
def __init__(self):
self._df = None
def _ensure_data(self):
if self._df is None:
self._df = get_clean_data()
self._df = engineer_features(self._df)
def get_dimension_scores(self):
"""JD-R 三维度统计分布"""
self._ensure_data()
df = self._df
result = {}
for dim_key, col_name in [
('demands', '工作要求指数'),
('resources', '工作资源指数'),
('personal', '个人资源指数'),
]:
if col_name not in df.columns:
continue
vals = df[col_name].dropna()
bins = np.linspace(vals.min(), vals.max(), 8)
hist, edges = np.histogram(vals, bins=bins)
result[dim_key] = {
'mean': round(float(vals.mean()), 2),
'std': round(float(vals.std()), 2),
'median': round(float(vals.median()), 2),
'distribution': [
{'range': f'{round(edges[i], 1)}-{round(edges[i+1], 1)}', 'count': int(hist[i])}
for i in range(len(hist))
],
}
# JD-R 平衡度
if 'JD-R平衡度' in df.columns:
balance = df['JD-R平衡度'].dropna()
result['balance'] = {
'mean': round(float(balance.mean()), 2),
'positive_ratio': round(float((balance > 0).mean()) * 100, 1),
}
return result
def get_burnout_engagement_analysis(self):
"""倦怠与投入分析"""
self._ensure_data()
df = self._df
result = {}
if '工作倦怠' in df.columns:
burnout = df['工作倦怠'].dropna()
result['burnout'] = {
'mean': round(float(burnout.mean()), 2),
'std': round(float(burnout.std()), 2),
'high_risk_ratio': round(float((burnout >= 5).mean()) * 100, 1),
'distribution': self._make_distribution(burnout, 1, 7, 7),
}
if '工作投入' in df.columns:
engagement = df['工作投入'].dropna()
result['engagement'] = {
'mean': round(float(engagement.mean()), 2),
'std': round(float(engagement.std()), 2),
'low_engagement_ratio': round(float((engagement <= 3).mean()) * 100, 1),
'distribution': self._make_distribution(engagement, 1, 7, 7),
}
# 相关性分析
corr_cols = {}
if '工作倦怠' in df.columns:
corr_cols['burnout'] = '工作倦怠'
if '工作投入' in df.columns:
corr_cols['engagement'] = '工作投入'
if '工作要求指数' in df.columns:
corr_cols['demands'] = '工作要求指数'
if '工作资源指数' in df.columns:
corr_cols['resources'] = '工作资源指数'
if config.TARGET_COLUMN in df.columns:
corr_cols['absence_hours'] = config.TARGET_COLUMN
if len(corr_cols) >= 2:
corr_df = df[[v for v in corr_cols.values()]].dropna()
corr_matrix = corr_df.corr()
correlations = {}
for k1, v1 in corr_cols.items():
for k2, v2 in corr_cols.items():
if k1 != k2 and v1 in corr_matrix.index and v2 in corr_matrix.columns:
correlations[f'{k1}_vs_{k2}'] = round(float(corr_matrix.loc[v1, v2]), 3)
result['correlations'] = correlations
return result
def get_jdr_path_analysis(self):
"""JD-R 双路径中介分析"""
self._ensure_data()
df = self._df
result = {}
target = config.TARGET_COLUMN
# 健康损伤路径: demands -> burnout -> absence
if all(col in df.columns for col in ['工作要求指数', '工作倦怠', target]):
cols = ['工作要求指数', '工作倦怠', target]
sub = df[cols].dropna()
if len(sub) > 30:
r_demands_burnout = sub['工作要求指数'].corr(sub['工作倦怠'])
r_burnout_absence = sub['工作倦怠'].corr(sub[target])
r_demands_absence = sub['工作要求指数'].corr(sub[target])
indirect = r_demands_burnout * r_burnout_absence
result['health_impairment'] = {
'direct_effect_demands': round(float(r_demands_absence), 3),
'indirect_via_burnout': round(float(indirect), 3),
'mediation_ratio': round(float(indirect / r_demands_absence) if r_demands_absence != 0 else 0, 3),
'demands_to_burnout': round(float(r_demands_burnout), 3),
'burnout_to_absence': round(float(r_burnout_absence), 3),
}
# 激励路径: resources -> engagement -> lower absence
if all(col in df.columns for col in ['工作资源指数', '工作投入', target]):
cols = ['工作资源指数', '工作投入', target]
sub = df[cols].dropna()
if len(sub) > 30:
r_resources_engagement = sub['工作资源指数'].corr(sub['工作投入'])
r_engagement_absence = sub['工作投入'].corr(sub[target])
r_resources_absence = sub['工作资源指数'].corr(sub[target])
indirect = r_resources_engagement * r_engagement_absence
result['motivational'] = {
'direct_effect_resources': round(float(r_resources_absence), 3),
'indirect_via_engagement': round(float(indirect), 3),
'mediation_ratio': round(float(indirect / r_resources_absence) if r_resources_absence != 0 else 0, 3),
'resources_to_engagement': round(float(r_resources_engagement), 3),
'engagement_to_absence': round(float(r_engagement_absence), 3),
}
return result
def get_jdr_profile(self, dimension='所属行业'):
"""按维度分组的 JD-R 轮廓"""
self._ensure_data()
df = self._df
if dimension not in df.columns:
return {'error': f'Dimension {dimension} not found'}
score_cols = ['工作要求指数', '工作资源指数', '个人资源指数', '工作倦怠', '工作投入']
existing_cols = [c for c in score_cols if c in df.columns]
if not existing_cols:
return {'error': 'JD-R scores not computed'}
group_cols = [dimension] + existing_cols
if config.TARGET_COLUMN in df.columns:
group_cols.append(config.TARGET_COLUMN)
grouped = df[group_cols].groupby(dimension).agg(['mean', 'std']).round(2)
profiles = []
for group_name in grouped.index:
profile = {'group_name': str(group_name)}
for col in existing_cols:
profile[col] = round(float(grouped.loc[group_name, (col, 'mean')]), 2)
if config.TARGET_COLUMN in df.columns:
profile['avg_absence_hours'] = round(float(grouped.loc[group_name, (config.TARGET_COLUMN, 'mean')]), 2)
profiles.append(profile)
return {'dimension': dimension, 'profiles': profiles}
def get_risk_distribution(self):
"""风险等级分布"""
self._ensure_data()
df = self._df
target = config.TARGET_COLUMN
if target not in df.columns:
return {'error': 'Target column not found'}
hours = df[target]
levels = [
{'level': 'low', 'label': '低风险', 'color': '#22c55e', 'count': int((hours < 4).sum()),
'percentage': round(float((hours < 4).mean()) * 100, 1), 'avg_hours': round(float(hours[hours < 4].mean()), 2) if (hours < 4).any() else 0},
{'level': 'medium', 'label': '中风险', 'color': '#f59e0b', 'count': int(((hours >= 4) & (hours <= 8)).sum()),
'percentage': round(float(((hours >= 4) & (hours <= 8)).mean()) * 100, 1),
'avg_hours': round(float(hours[(hours >= 4) & (hours <= 8)].mean()), 2) if ((hours >= 4) & (hours <= 8)).any() else 0},
{'level': 'high', 'label': '高风险', 'color': '#ef4444', 'count': int((hours > 8).sum()),
'percentage': round(float((hours > 8).mean()) * 100, 1), 'avg_hours': round(float(hours[hours > 8].mean()), 2) if (hours > 8).any() else 0},
]
return {'levels': levels, 'total': len(hours)}
def _make_distribution(self, series, low, high, n_bins):
bins = np.linspace(low, high, n_bins + 1)
hist, edges = np.histogram(series, bins=bins)
return [
{'range': f'{round(edges[i], 1)}-{round(edges[i+1], 1)}', 'count': int(hist[i])}
for i in range(len(hist))
]
jdr_service = JDRService()