import numpy as np from sklearn.cluster import KMeans from sklearn.preprocessing import MinMaxScaler import config from core.preprocessing import get_clean_data class KMeansAnalyzer: def __init__(self, n_clusters=3): self.n_clusters = n_clusters self.model = None self.scaler = MinMaxScaler() self.labels = None self.feature_cols = [ '年龄', '司龄年数', '月均加班时长', '通勤时长分钟', 'BMI', '缺勤时长(小时)', ] def fit(self, n_clusters=None): if n_clusters: self.n_clusters = n_clusters df = get_clean_data().reset_index(drop=True) data = df[self.feature_cols].values data_scaled = self.scaler.fit_transform(data) self.model = KMeans(n_clusters=self.n_clusters, random_state=config.RANDOM_STATE, n_init=10) self.labels = self.model.fit_predict(data_scaled) return self.model def get_cluster_results(self, n_clusters=3): if self.model is None or self.n_clusters != n_clusters: self.fit(n_clusters) centers = self.scaler.inverse_transform(self.model.cluster_centers_) unique, counts = np.unique(self.labels, return_counts=True) total = len(self.labels) names = self._generate_cluster_names(centers) clusters = [] for cluster_id, count in zip(unique, counts): center = centers[int(cluster_id)] clusters.append({ 'id': int(cluster_id), 'name': names.get(int(cluster_id), f'群体{int(cluster_id) + 1}'), 'member_count': int(count), 'percentage': round(count / total * 100, 1), 'center': { feature: round(float(value), 2) for feature, value in zip(self.feature_cols, center) }, 'description': self._generate_description(names.get(int(cluster_id), '')), }) return {'n_clusters': self.n_clusters, 'clusters': clusters} def get_cluster_profile(self, n_clusters=3): if self.model is None or self.n_clusters != n_clusters: self.fit(n_clusters) centers_scaled = self.model.cluster_centers_ names = self._generate_cluster_names(self.scaler.inverse_transform(centers_scaled)) return { 'dimensions': ['年龄', '司龄', '加班', '通勤', 'BMI', '缺勤'], 'dimension_keys': self.feature_cols, 'clusters': [ { 'id': idx, 'name': names.get(idx, f'群体{idx + 1}'), 'values': [round(float(v), 2) for v in centers_scaled[idx]], } for idx in range(self.n_clusters) ], } def get_scatter_data(self, n_clusters=3, x_axis='月均加班时长', y_axis='缺勤时长(小时)'): if self.model is None or self.n_clusters != n_clusters: self.fit(n_clusters) df = get_clean_data().reset_index(drop=True) if x_axis not in df.columns: x_axis = '月均加班时长' if y_axis not in df.columns: y_axis = config.TARGET_COLUMN points = [] for idx in range(min(len(df), len(self.labels))): row = df.iloc[idx] points.append({ 'employee_id': str(row[config.EMPLOYEE_ID_COLUMN]), 'x': float(row[x_axis]), 'y': float(row[y_axis]), 'cluster_id': int(self.labels[idx]), }) return { 'x_axis': x_axis, 'x_axis_name': config.FEATURE_NAME_CN.get(x_axis, x_axis), 'y_axis': y_axis, 'y_axis_name': config.FEATURE_NAME_CN.get(y_axis, y_axis), 'points': points[:500], 'cluster_colors': { '0': '#5B8FF9', '1': '#61DDAA', '2': '#F6BD16', '3': '#E8684A', '4': '#6DC8EC', }, } def _generate_cluster_names(self, centers): names = {} for idx, center in enumerate(centers): _, tenure, overtime, commute, bmi, absence = center if overtime > 38 and commute > 55 and absence > 8: names[idx] = '高压通勤型' elif bmi > 27 and absence > 8: names[idx] = '健康波动型' elif tenure > 8 and absence < 6: names[idx] = '稳定低风险型' elif overtime > 28 and absence > 7: names[idx] = '轮班负荷型' else: names[idx] = f'群体{idx + 1}' return names def _generate_description(self, name): descriptions = { '高压通勤型': '加班和通勤压力都高,缺勤时长偏长。', '健康波动型': '健康相关风险更高,需要重点关注。', '稳定低风险型': '司龄较长,缺勤水平稳定且偏低。', '轮班负荷型': '排班和工作负荷较重,缺勤风险较高。', } return descriptions.get(name, '常规员工群体。') kmeans_analyzer = KMeansAnalyzer()