import os import numpy as np import joblib import config MODEL_INFO = { 'random_forest': { 'name': 'random_forest', 'name_cn': '随机森林', 'description': '基于决策树的集成学习算法' }, 'xgboost': { 'name': 'xgboost', 'name_cn': 'XGBoost', 'description': '高效的梯度提升算法' }, 'lightgbm': { 'name': 'lightgbm', 'name_cn': 'LightGBM', 'description': '微软轻量级梯度提升框架' }, 'gradient_boosting': { 'name': 'gradient_boosting', 'name_cn': 'GBDT', 'description': '梯度提升决策树' }, 'extra_trees': { 'name': 'extra_trees', 'name_cn': '极端随机树', 'description': '随机森林的变体,随机性更强' }, 'stacking': { 'name': 'stacking', 'name_cn': 'Stacking集成', 'description': '多层堆叠集成学习' } } class PredictService: def __init__(self): self.models = {} self.scaler = None self.feature_names = None self.selected_features = None self.label_encoders = {} self.model_metrics = {} self.default_model = 'random_forest' def _ensure_models_loaded(self): if not self.models: self.load_models() def load_models(self): model_files = { 'random_forest': 'random_forest_model.pkl', 'xgboost': 'xgboost_model.pkl', 'lightgbm': 'lightgbm_model.pkl', 'gradient_boosting': 'gradient_boosting_model.pkl', 'extra_trees': 'extra_trees_model.pkl', 'stacking': 'stacking_model.pkl' } for name, filename in model_files.items(): model_path = os.path.join(config.MODELS_DIR, filename) if os.path.exists(model_path): try: self.models[name] = joblib.load(model_path) print(f"Loaded {name} model") except Exception as e: print(f"Failed to load {name}: {e}") if os.path.exists(config.SCALER_PATH): self.scaler = joblib.load(config.SCALER_PATH) feature_names_path = os.path.join(config.MODELS_DIR, 'feature_names.pkl') if os.path.exists(feature_names_path): self.feature_names = joblib.load(feature_names_path) selected_features_path = os.path.join(config.MODELS_DIR, 'selected_features.pkl') if os.path.exists(selected_features_path): self.selected_features = joblib.load(selected_features_path) label_encoders_path = os.path.join(config.MODELS_DIR, 'label_encoders.pkl') if os.path.exists(label_encoders_path): self.label_encoders = joblib.load(label_encoders_path) metrics_path = os.path.join(config.MODELS_DIR, 'model_metrics.pkl') if os.path.exists(metrics_path): self.model_metrics = joblib.load(metrics_path) if self.model_metrics: valid_metrics = {k: v for k, v in self.model_metrics.items() if k in self.models} if valid_metrics: best_model = max(valid_metrics.items(), key=lambda x: x[1]['r2']) self.default_model = best_model[0] def get_available_models(self): self._ensure_models_loaded() models = [] for name in self.models.keys(): info = MODEL_INFO.get(name, { 'name': name, 'name_cn': name, 'description': '' }).copy() info['is_available'] = True info['is_default'] = (name == self.default_model) if name in self.model_metrics: info['metrics'] = self.model_metrics[name] else: info['metrics'] = {'r2': 0, 'rmse': 0, 'mae': 0} models.append(info) models.sort(key=lambda x: x['metrics']['r2'], reverse=True) return models def predict_single(self, data, model_type=None): self._ensure_models_loaded() if model_type is None: model_type = self.default_model if model_type not in self.models: available = list(self.models.keys()) if available: model_type = available[0] else: return self._get_default_prediction(data) model = self.models[model_type] if self.scaler is None or self.feature_names is None: return self._get_default_prediction(data) features = self._prepare_features(data) try: predicted_hours = model.predict([features])[0] predicted_hours = max(0, float(predicted_hours)) except Exception as e: print(f"Prediction error: {e}") return self._get_default_prediction(data) risk_level, risk_label = self._get_risk_level(predicted_hours) confidence = 0.85 if model_type in self.model_metrics: confidence = max(0.5, self.model_metrics[model_type].get('r2', 0.85)) return { 'predicted_hours': round(predicted_hours, 2), 'risk_level': risk_level, 'risk_label': risk_label, 'confidence': round(confidence, 2), 'model_used': model_type, 'model_name_cn': MODEL_INFO.get(model_type, {}).get('name_cn', model_type) } def predict_compare(self, data): self._ensure_models_loaded() results = [] for name in self.models.keys(): try: result = self.predict_single(data, name) result['model'] = name result['model_name_cn'] = MODEL_INFO.get(name, {}).get('name_cn', name) if name in self.model_metrics: result['r2'] = self.model_metrics[name]['r2'] else: result['r2'] = 0 results.append(result) except Exception as e: print(f"Compare error for {name}: {e}") results.sort(key=lambda x: x.get('r2', 0), reverse=True) if results: results[0]['recommended'] = True return results def _prepare_features(self, data): feature_map = { 'Reason for absence': data.get('reason_for_absence', 23), 'Month of absence': data.get('month_of_absence', 7), 'Day of the week': data.get('day_of_week', 3), 'Seasons': data.get('seasons', 1), 'Transportation expense': data.get('transportation_expense', 200), 'Distance from Residence to Work': data.get('distance', 20), 'Service time': data.get('service_time', 5), 'Age': data.get('age', 30), 'Work load Average/day': data.get('work_load', 250), 'Hit target': data.get('hit_target', 95), 'Disciplinary failure': data.get('disciplinary_failure', 0), 'Education': data.get('education', 1), 'Son': data.get('son', 0), 'Social drinker': data.get('social_drinker', 0), 'Social smoker': data.get('social_smoker', 0), 'Pet': data.get('pet', 0), 'Body mass index': data.get('bmi', 25) } age = feature_map['Age'] service_time = feature_map['Service time'] work_load = feature_map['Work load Average/day'] distance = feature_map['Distance from Residence to Work'] expense = feature_map['Transportation expense'] bmi = feature_map['Body mass index'] son = feature_map['Son'] pet = feature_map['Pet'] social_drinker = feature_map['Social drinker'] social_smoker = feature_map['Social smoker'] hit_target = feature_map['Hit target'] seasons = feature_map['Seasons'] day_of_week = feature_map['Day of the week'] derived_features = { 'workload_per_age': work_load / (age + 1), 'expense_per_distance': expense / (distance + 1), 'age_service_ratio': age / (service_time + 1), 'has_children': 1 if son > 0 else 0, 'has_pet': 1 if pet > 0 else 0, 'family_responsibility': son + pet, 'health_risk': 1 if (social_drinker == 1 or social_smoker == 1 or bmi > 30) else 0, 'lifestyle_risk': int(social_drinker) + int(social_smoker), 'age_group': 1 if age <= 30 else (2 if age <= 40 else (3 if age <= 50 else 4)), 'service_group': 1 if service_time <= 5 else (2 if service_time <= 10 else (3 if service_time <= 20 else 4)), 'bmi_category': 1 if bmi <= 18.5 else (2 if bmi <= 25 else (3 if bmi <= 30 else 4)), 'workload_category': 1 if work_load <= 200 else (2 if work_load <= 250 else (3 if work_load <= 300 else 4)), 'commute_category': 1 if distance <= 10 else (2 if distance <= 20 else (3 if distance <= 50 else 4)), 'seasonal_risk': 1 if seasons in [1, 3] else 0, 'weekday_risk': 1 if day_of_week in [2, 6] else 0, 'hit_target_ratio': hit_target / 100, 'experience_level': 1 if service_time <= 5 else (2 if service_time <= 10 else (3 if service_time <= 15 else 4)), 'age_workload_interaction': age * work_load / 10000, 'service_bmi_interaction': service_time * bmi / 100 } all_features = {**feature_map, **derived_features} features = [] for fname in self.feature_names: if fname in all_features: val = all_features[fname] if fname in self.label_encoders: try: val = self.label_encoders[fname].transform([str(val)])[0] except: val = 0 features.append(float(val)) else: features.append(0.0) features = np.array(features).reshape(1, -1) features = self.scaler.transform(features)[0] if self.selected_features: selected_indices = [] for sf in self.selected_features: if sf in self.feature_names: selected_indices.append(self.feature_names.index(sf)) if selected_indices: features = features[selected_indices] return features def _get_risk_level(self, hours): if hours < 4: return 'low', '低风险' elif hours <= 8: return 'medium', '中风险' else: return 'high', '高风险' def _get_default_prediction(self, data): base_hours = 5.0 expense = data.get('transportation_expense', 200) if expense > 300: base_hours += 1.0 elif expense < 150: base_hours -= 0.5 distance = data.get('distance', 20) if distance > 40: base_hours += 1.5 elif distance > 25: base_hours += 0.8 service_time = data.get('service_time', 5) if service_time < 3: base_hours += 0.5 elif service_time > 15: base_hours -= 0.5 age = data.get('age', 30) if age > 50: base_hours += 0.5 elif age < 25: base_hours += 0.3 work_load = data.get('work_load', 250) if work_load > 300: base_hours += 1.5 elif work_load > 260: base_hours += 0.5 bmi = data.get('bmi', 25) if bmi > 30: base_hours += 0.8 elif bmi < 20: base_hours += 0.3 if data.get('social_drinker', 0) == 1: base_hours += 0.8 if data.get('social_smoker', 0) == 1: base_hours += 0.5 son = data.get('son', 0) if son > 0: base_hours += 0.3 * son pet = data.get('pet', 0) if pet > 0: base_hours -= 0.1 * pet hit_target = data.get('hit_target', 95) if hit_target < 90: base_hours += 0.5 base_hours = max(0.5, base_hours) risk_level, risk_label = self._get_risk_level(base_hours) return { 'predicted_hours': round(base_hours, 2), 'risk_level': risk_level, 'risk_label': risk_label, 'confidence': 0.75, 'model_used': 'default', 'model_name_cn': '默认规则' } def get_model_info(self): self._ensure_models_loaded() models = self.get_available_models() return { 'models': models, 'training_info': { 'train_samples': 2884, 'test_samples': 722, 'feature_count': len(self.feature_names) if self.feature_names else 20, 'training_date': '2026-03-08' } } predict_service = PredictService()