Files
forsetsystem/backend/services/analysis_service.py
shenjianZ e63267cef6 feat: 将数据集从国外员工缺勤数据替换为中国企业缺勤模拟数据
- 新增中国企业员工缺勤模拟数据集生成脚本(generate_dataset.py),覆盖7个行业、180家企业、2600名员工
  - 重构 config.py,更新特征字段为中文名称,调整目标列、员工ID、行业类型等配置
  - 重构 clustering.py,简化聚类逻辑,更新聚类特征和群体命名(高压通勤型、健康波动型等)
  - 重构 feature_mining.py,更新相关性分析和群体比较维度(按行业、班次、婚姻状态等)
  - 新增 model_features.py 定义模型训练特征
  - 更新 preprocessing.py 和 train_model.py 适配新数据结构
  - 更新各 API 路由默认参数(model: random_forest, dimension: industry)
  - 前端更新主题样式和各视图组件适配中文字段
  - 更新系统名称为 China Enterprise Absence Analysis System
2026-03-11 10:46:58 +08:00

106 lines
3.9 KiB
Python

import os
import joblib
import config
from core.feature_mining import get_correlation_for_heatmap, group_comparison
class AnalysisService:
def __init__(self):
self.models = {}
self.feature_names = None
self.selected_features = None
self.training_metadata = {}
def _ensure_models_loaded(self):
if self.models:
return
metadata_path = os.path.join(config.MODELS_DIR, 'training_metadata.pkl')
if os.path.exists(metadata_path):
self.training_metadata = joblib.load(metadata_path)
model_files = {
'random_forest': 'random_forest_model.pkl',
'xgboost': 'xgboost_model.pkl',
'lightgbm': 'lightgbm_model.pkl',
'gradient_boosting': 'gradient_boosting_model.pkl',
}
allowed_models = self.training_metadata.get('available_models')
if allowed_models:
model_files = {k: v for k, v in model_files.items() if k in allowed_models}
for name, filename in model_files.items():
path = os.path.join(config.MODELS_DIR, filename)
if os.path.exists(path):
try:
self.models[name] = joblib.load(path)
except Exception as exc:
print(f'Failed to load model {name}: {exc}')
for filename, attr in [('feature_names.pkl', 'feature_names'), ('selected_features.pkl', 'selected_features')]:
path = os.path.join(config.MODELS_DIR, filename)
if os.path.exists(path):
try:
setattr(self, attr, joblib.load(path))
except Exception as exc:
print(f'Failed to load artifact {filename}: {exc}')
def get_feature_importance(self, model_type='random_forest'):
self._ensure_models_loaded()
if model_type not in self.models:
model_type = next(iter(self.models), 'default')
if model_type == 'default':
return self._get_default_importance()
model = self.models[model_type]
if not hasattr(model, 'feature_importances_'):
return self._get_default_importance()
importances = model.feature_importances_
feature_names = self.selected_features or self.feature_names or []
if len(feature_names) != len(importances):
feature_names = [f'feature_{idx}' for idx in range(len(importances))]
ranked = sorted(zip(feature_names, importances), key=lambda item: item[1], reverse=True)[:15]
return {
'model_type': model_type,
'features': [
{
'name': name,
'name_cn': config.FEATURE_NAME_CN.get(name, name),
'importance': round(float(importance), 4),
'rank': idx + 1,
}
for idx, (name, importance) in enumerate(ranked)
],
}
def _get_default_importance(self):
defaults = [
('加班通勤压力指数', 0.24),
('健康风险指数', 0.18),
('请假类型', 0.12),
('通勤时长分钟', 0.1),
('月均加班时长', 0.08),
('近90天缺勤次数', 0.07),
('心理压力等级', 0.06),
('家庭负担指数', 0.05),
]
return {
'model_type': 'default',
'features': [
{
'name': name,
'name_cn': config.FEATURE_NAME_CN.get(name, name),
'importance': importance,
'rank': idx + 1,
}
for idx, (name, importance) in enumerate(defaults)
],
}
def get_correlation(self):
return get_correlation_for_heatmap()
def get_group_comparison(self, dimension):
return group_comparison(dimension)
analysis_service = AnalysisService()