import numpy as np import pandas as pd import config from core.model_features import engineer_features from core.preprocessing import get_clean_data class JDRService: """JD-R(工作要求-资源)理论分析服务""" def __init__(self): self._df = None def _ensure_data(self): if self._df is None: self._df = get_clean_data() self._df = engineer_features(self._df) def get_dimension_scores(self): """JD-R 三维度统计分布""" self._ensure_data() df = self._df result = {} for dim_key, col_name in [ ('demands', '工作要求指数'), ('resources', '工作资源指数'), ('personal', '个人资源指数'), ]: if col_name not in df.columns: continue vals = df[col_name].dropna() bins = np.linspace(vals.min(), vals.max(), 8) hist, edges = np.histogram(vals, bins=bins) result[dim_key] = { 'mean': round(float(vals.mean()), 2), 'std': round(float(vals.std()), 2), 'median': round(float(vals.median()), 2), 'distribution': [ {'range': f'{round(edges[i], 1)}-{round(edges[i+1], 1)}', 'count': int(hist[i])} for i in range(len(hist)) ], } # JD-R 平衡度 if 'JD-R平衡度' in df.columns: balance = df['JD-R平衡度'].dropna() result['balance'] = { 'mean': round(float(balance.mean()), 2), 'positive_ratio': round(float((balance > 0).mean()) * 100, 1), } return result def get_burnout_engagement_analysis(self): """倦怠与投入分析""" self._ensure_data() df = self._df result = {} if '工作倦怠' in df.columns: burnout = df['工作倦怠'].dropna() result['burnout'] = { 'mean': round(float(burnout.mean()), 2), 'std': round(float(burnout.std()), 2), 'high_risk_ratio': round(float((burnout >= 5).mean()) * 100, 1), 'distribution': self._make_distribution(burnout, 1, 7, 7), } if '工作投入' in df.columns: engagement = df['工作投入'].dropna() result['engagement'] = { 'mean': round(float(engagement.mean()), 2), 'std': round(float(engagement.std()), 2), 'low_engagement_ratio': round(float((engagement <= 3).mean()) * 100, 1), 'distribution': self._make_distribution(engagement, 1, 7, 7), } # 相关性分析 corr_cols = {} if '工作倦怠' in df.columns: corr_cols['burnout'] = '工作倦怠' if '工作投入' in df.columns: corr_cols['engagement'] = '工作投入' if '工作要求指数' in df.columns: corr_cols['demands'] = '工作要求指数' if '工作资源指数' in df.columns: corr_cols['resources'] = '工作资源指数' if config.TARGET_COLUMN in df.columns: corr_cols['absence_hours'] = config.TARGET_COLUMN if len(corr_cols) >= 2: corr_df = df[[v for v in corr_cols.values()]].dropna() corr_matrix = corr_df.corr() correlations = {} for k1, v1 in corr_cols.items(): for k2, v2 in corr_cols.items(): if k1 != k2 and v1 in corr_matrix.index and v2 in corr_matrix.columns: correlations[f'{k1}_vs_{k2}'] = round(float(corr_matrix.loc[v1, v2]), 3) result['correlations'] = correlations return result def get_jdr_path_analysis(self): """JD-R 双路径中介分析""" self._ensure_data() df = self._df result = {} target = config.TARGET_COLUMN # 健康损伤路径: demands -> burnout -> absence if all(col in df.columns for col in ['工作要求指数', '工作倦怠', target]): cols = ['工作要求指数', '工作倦怠', target] sub = df[cols].dropna() if len(sub) > 30: r_demands_burnout = sub['工作要求指数'].corr(sub['工作倦怠']) r_burnout_absence = sub['工作倦怠'].corr(sub[target]) r_demands_absence = sub['工作要求指数'].corr(sub[target]) indirect = r_demands_burnout * r_burnout_absence result['health_impairment'] = { 'direct_effect_demands': round(float(r_demands_absence), 3), 'indirect_via_burnout': round(float(indirect), 3), 'mediation_ratio': round(float(indirect / r_demands_absence) if r_demands_absence != 0 else 0, 3), 'demands_to_burnout': round(float(r_demands_burnout), 3), 'burnout_to_absence': round(float(r_burnout_absence), 3), } # 激励路径: resources -> engagement -> lower absence if all(col in df.columns for col in ['工作资源指数', '工作投入', target]): cols = ['工作资源指数', '工作投入', target] sub = df[cols].dropna() if len(sub) > 30: r_resources_engagement = sub['工作资源指数'].corr(sub['工作投入']) r_engagement_absence = sub['工作投入'].corr(sub[target]) r_resources_absence = sub['工作资源指数'].corr(sub[target]) indirect = r_resources_engagement * r_engagement_absence result['motivational'] = { 'direct_effect_resources': round(float(r_resources_absence), 3), 'indirect_via_engagement': round(float(indirect), 3), 'mediation_ratio': round(float(indirect / r_resources_absence) if r_resources_absence != 0 else 0, 3), 'resources_to_engagement': round(float(r_resources_engagement), 3), 'engagement_to_absence': round(float(r_engagement_absence), 3), } return result def get_jdr_profile(self, dimension='所属行业'): """按维度分组的 JD-R 轮廓""" self._ensure_data() df = self._df if dimension not in df.columns: return {'error': f'Dimension {dimension} not found'} score_cols = ['工作要求指数', '工作资源指数', '个人资源指数', '工作倦怠', '工作投入'] existing_cols = [c for c in score_cols if c in df.columns] if not existing_cols: return {'error': 'JD-R scores not computed'} group_cols = [dimension] + existing_cols if config.TARGET_COLUMN in df.columns: group_cols.append(config.TARGET_COLUMN) grouped = df[group_cols].groupby(dimension).agg(['mean', 'std']).round(2) profiles = [] for group_name in grouped.index: profile = {'group_name': str(group_name)} for col in existing_cols: profile[col] = round(float(grouped.loc[group_name, (col, 'mean')]), 2) if config.TARGET_COLUMN in df.columns: profile['avg_absence_hours'] = round(float(grouped.loc[group_name, (config.TARGET_COLUMN, 'mean')]), 2) profiles.append(profile) return {'dimension': dimension, 'profiles': profiles} def get_risk_distribution(self): """风险等级分布""" self._ensure_data() df = self._df target = config.TARGET_COLUMN if target not in df.columns: return {'error': 'Target column not found'} hours = df[target] levels = [ {'level': 'low', 'label': '低风险', 'color': '#22c55e', 'count': int((hours < 4).sum()), 'percentage': round(float((hours < 4).mean()) * 100, 1), 'avg_hours': round(float(hours[hours < 4].mean()), 2) if (hours < 4).any() else 0}, {'level': 'medium', 'label': '中风险', 'color': '#f59e0b', 'count': int(((hours >= 4) & (hours <= 8)).sum()), 'percentage': round(float(((hours >= 4) & (hours <= 8)).mean()) * 100, 1), 'avg_hours': round(float(hours[(hours >= 4) & (hours <= 8)].mean()), 2) if ((hours >= 4) & (hours <= 8)).any() else 0}, {'level': 'high', 'label': '高风险', 'color': '#ef4444', 'count': int((hours > 8).sum()), 'percentage': round(float((hours > 8).mean()) * 100, 1), 'avg_hours': round(float(hours[hours > 8].mean()), 2) if (hours > 8).any() else 0}, ] return {'levels': levels, 'total': len(hours)} def _make_distribution(self, series, low, high, n_bins): bins = np.linspace(low, high, n_bins + 1) hist, edges = np.histogram(series, bins=bins) return [ {'range': f'{round(edges[i], 1)}-{round(edges[i+1], 1)}', 'count': int(hist[i])} for i in range(len(hist)) ] jdr_service = JDRService()