import os import joblib import numpy as np import config from core.model_features import ( align_feature_frame, apply_label_encoders, build_prediction_dataframe, engineer_features, to_float_array, ) MODEL_INFO = { 'random_forest': {'name': 'random_forest', 'name_cn': '随机森林', 'description': '稳健的树模型集成'}, 'xgboost': {'name': 'xgboost', 'name_cn': 'XGBoost', 'description': '梯度提升树模型'}, 'lightgbm': {'name': 'lightgbm', 'name_cn': 'LightGBM', 'description': '轻量级梯度提升树'}, 'gradient_boosting': {'name': 'gradient_boosting', 'name_cn': 'GBDT', 'description': '梯度提升决策树'}, 'extra_trees': {'name': 'extra_trees', 'name_cn': '极端随机树', 'description': '高随机性的树模型'}, 'stacking': {'name': 'stacking', 'name_cn': 'Stacking集成', 'description': '多模型融合'}, } class PredictService: def __init__(self): self.models = {} self.scaler = None self.feature_names = None self.selected_features = None self.label_encoders = {} self.model_metrics = {} self.training_metadata = {} self.default_model = 'random_forest' def _ensure_models_loaded(self): if not self.models: self.load_models() def load_models(self): metadata_path = os.path.join(config.MODELS_DIR, 'training_metadata.pkl') if os.path.exists(metadata_path): self.training_metadata = joblib.load(metadata_path) model_files = { 'random_forest': 'random_forest_model.pkl', 'xgboost': 'xgboost_model.pkl', 'lightgbm': 'lightgbm_model.pkl', 'gradient_boosting': 'gradient_boosting_model.pkl', 'extra_trees': 'extra_trees_model.pkl', 'stacking': 'stacking_model.pkl', } allowed_models = self.training_metadata.get('available_models') if allowed_models: model_files = {k: v for k, v in model_files.items() if k in allowed_models} for name, filename in model_files.items(): path = os.path.join(config.MODELS_DIR, filename) if os.path.exists(path): try: self.models[name] = joblib.load(path) except Exception as exc: print(f'Failed to load model {name}: {exc}') if os.path.exists(config.SCALER_PATH): self.scaler = joblib.load(config.SCALER_PATH) for filename, attr in [ ('feature_names.pkl', 'feature_names'), ('selected_features.pkl', 'selected_features'), ('label_encoders.pkl', 'label_encoders'), ('model_metrics.pkl', 'model_metrics'), ]: path = os.path.join(config.MODELS_DIR, filename) if os.path.exists(path): try: setattr(self, attr, joblib.load(path)) except Exception as exc: print(f'Failed to load artifact {filename}: {exc}') valid_metrics = {key: value for key, value in self.model_metrics.items() if key in self.models} if valid_metrics: self.default_model = max(valid_metrics.items(), key=lambda item: item[1]['r2'])[0] def get_available_models(self): self._ensure_models_loaded() models = [] for name in self.models.keys(): info = MODEL_INFO.get(name, {'name': name, 'name_cn': name, 'description': ''}).copy() info['is_available'] = True info['is_default'] = name == self.default_model info['metrics'] = self.model_metrics.get(name, {'r2': 0, 'rmse': 0, 'mae': 0}) models.append(info) models.sort(key=lambda item: item['metrics']['r2'], reverse=True) return models def predict_single(self, data, model_type=None): self._ensure_models_loaded() model_type = model_type or self.default_model if model_type not in self.models: fallback = next(iter(self.models), None) if fallback is None: return self._get_default_prediction(data) model_type = fallback if self.scaler is None or self.feature_names is None: return self._get_default_prediction(data) features = self._prepare_features(data) try: predicted_hours = self.models[model_type].predict([features])[0] predicted_hours = self._inverse_transform_prediction(predicted_hours) predicted_hours = max(0.5, float(predicted_hours)) except Exception: return self._get_default_prediction(data) risk_level, risk_label = self._get_risk_level(predicted_hours) confidence = max(0.5, self.model_metrics.get(model_type, {}).get('r2', 0.82)) return { 'predicted_hours': round(predicted_hours, 2), 'risk_level': risk_level, 'risk_label': risk_label, 'confidence': round(confidence, 2), 'model_used': model_type, 'model_name_cn': MODEL_INFO.get(model_type, {}).get('name_cn', model_type), } def predict_compare(self, data): self._ensure_models_loaded() results = [] for name in self.models.keys(): result = self.predict_single(data, name) result['model'] = name result['model_name_cn'] = MODEL_INFO.get(name, {}).get('name_cn', name) result['r2'] = self.model_metrics.get(name, {}).get('r2', 0) results.append(result) results.sort(key=lambda item: item.get('r2', 0), reverse=True) if results: results[0]['recommended'] = True return results def _prepare_features(self, data): X_df = build_prediction_dataframe(data) X_df = engineer_features(X_df) X_df = apply_label_encoders(X_df, self.label_encoders) X_df = align_feature_frame(X_df, self.feature_names) features = self.scaler.transform(to_float_array(X_df))[0] if self.selected_features: selected_indices = [self.feature_names.index(name) for name in self.selected_features if name in self.feature_names] if selected_indices: features = features[selected_indices] return features def _inverse_transform_prediction(self, prediction): if self.training_metadata.get('target_transform') == 'log1p': return float(np.expm1(prediction)) return float(prediction) def _get_risk_level(self, hours): if hours < 4: return 'low', '低风险' if hours <= 8: return 'medium', '中风险' return 'high', '高风险' def _get_default_prediction(self, data): base_hours = 3.8 base_hours += min(float(data.get('monthly_overtime_hours', 24)) / 20, 3.0) base_hours += min(float(data.get('commute_minutes', 40)) / 50, 2.0) base_hours += 1.6 if int(data.get('is_night_shift', 0)) == 1 else 0 base_hours += 1.8 if int(data.get('chronic_disease_flag', 0)) == 1 else 0 base_hours += 0.9 if int(data.get('near_holiday_flag', 0)) == 1 else 0 base_hours += 0.8 if int(data.get('medical_certificate_flag', 0)) == 1 else 0 base_hours += 0.5 * int(data.get('children_count', 0)) if data.get('leave_type') in ['病假', '工伤假', '婚假', '丧假']: base_hours += 2.5 if data.get('stress_level') == '高': base_hours += 0.9 if data.get('performance_level') == 'A': base_hours -= 0.5 risk_level, risk_label = self._get_risk_level(base_hours) return { 'predicted_hours': round(max(0.5, base_hours), 2), 'risk_level': risk_level, 'risk_label': risk_label, 'confidence': 0.72, 'model_used': 'default', 'model_name_cn': '默认规则', } def get_model_info(self): self._ensure_models_loaded() return { 'models': self.get_available_models(), 'training_info': { 'train_samples': self.training_metadata.get('train_samples', 0), 'test_samples': self.training_metadata.get('test_samples', 0), 'feature_count': self.training_metadata.get('feature_count_after_selection', 0), 'training_date': self.training_metadata.get('training_date', ''), }, } predict_service = PredictService()