import config from core.preprocessing import get_clean_data class DataService: def __init__(self): self._df = None @property def df(self): if self._df is None: self._df = get_clean_data() return self._df def get_basic_stats(self): df = self.df total_records = len(df) total_employees = df[config.EMPLOYEE_ID_COLUMN].nunique() avg_absent_hours = round(df[config.TARGET_COLUMN].mean(), 2) max_absent_hours = round(float(df[config.TARGET_COLUMN].max()), 1) min_absent_hours = round(float(df[config.TARGET_COLUMN].min()), 1) high_risk_count = len(df[df[config.TARGET_COLUMN] > 8]) return { 'total_records': total_records, 'total_employees': total_employees, 'avg_absent_hours': avg_absent_hours, 'max_absent_hours': max_absent_hours, 'min_absent_hours': min_absent_hours, 'high_risk_ratio': round(high_risk_count / total_records, 4), 'industries_covered': int(df['所属行业'].nunique()), } def get_monthly_trend(self): df = self.df monthly = df.groupby('缺勤月份').agg({config.TARGET_COLUMN: ['sum', 'mean', 'count']}).reset_index() monthly.columns = ['month', 'total_hours', 'avg_hours', 'record_count'] result = {'months': [], 'total_hours': [], 'avg_hours': [], 'record_counts': []} for month in range(1, 13): row = monthly[monthly['month'] == month] result['months'].append(f'{month}月') if len(row): result['total_hours'].append(round(float(row['total_hours'].values[0]), 1)) result['avg_hours'].append(round(float(row['avg_hours'].values[0]), 2)) result['record_counts'].append(int(row['record_count'].values[0])) else: result['total_hours'].append(0) result['avg_hours'].append(0) result['record_counts'].append(0) return result def get_weekday_distribution(self): df = self.df weekday = df.groupby('星期几').agg({config.TARGET_COLUMN: ['sum', 'mean', 'count']}).reset_index() weekday.columns = ['weekday', 'total_hours', 'avg_hours', 'record_count'] result = {'weekdays': [], 'weekday_codes': [], 'total_hours': [], 'avg_hours': [], 'record_counts': []} for code in range(1, 8): row = weekday[weekday['weekday'] == code] result['weekdays'].append(config.WEEKDAY_NAMES.get(code, str(code))) result['weekday_codes'].append(code) if len(row): result['total_hours'].append(round(float(row['total_hours'].values[0]), 1)) result['avg_hours'].append(round(float(row['avg_hours'].values[0]), 2)) result['record_counts'].append(int(row['record_count'].values[0])) else: result['total_hours'].append(0) result['avg_hours'].append(0) result['record_counts'].append(0) return result def get_reason_distribution(self): df = self.df reason = df.groupby('请假原因大类').agg({config.TARGET_COLUMN: 'count'}).reset_index() reason.columns = ['name', 'count'] reason = reason.sort_values('count', ascending=False) total = reason['count'].sum() return { 'reasons': [ { 'name': row['name'], 'count': int(row['count']), 'percentage': round(float(row['count']) / total * 100, 1), } for _, row in reason.iterrows() ] } def get_season_distribution(self): df = self.df season = df.groupby('季节').agg({config.TARGET_COLUMN: ['sum', 'mean', 'count']}).reset_index() season.columns = ['season', 'total_hours', 'avg_hours', 'record_count'] total_records = season['record_count'].sum() result = {'seasons': []} for code in [1, 2, 3, 4]: row = season[season['season'] == code] if not len(row): continue result['seasons'].append({ 'code': code, 'name': config.SEASON_NAMES.get(code, f'季节{code}'), 'total_hours': round(float(row['total_hours'].values[0]), 1), 'avg_hours': round(float(row['avg_hours'].values[0]), 2), 'record_count': int(row['record_count'].values[0]), 'percentage': round(float(row['record_count'].values[0]) / total_records * 100, 1), }) return result data_service = DataService()