Files
forsetsystem/backend/core/clustering.py

183 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import MinMaxScaler
import config
from core.preprocessing import get_clean_data
class KMeansAnalyzer:
def __init__(self, n_clusters=3):
self.n_clusters = n_clusters
self.model = None
self.scaler = MinMaxScaler()
self.labels = None
self.feature_cols = [
'年龄',
'司龄年数',
'月均加班时长',
'通勤时长分钟',
'BMI',
'缺勤时长(小时)',
]
def fit(self, n_clusters=None):
if n_clusters:
self.n_clusters = n_clusters
df = get_clean_data().reset_index(drop=True)
data = df[self.feature_cols].values
data_scaled = self.scaler.fit_transform(data)
self.model = KMeans(n_clusters=self.n_clusters, random_state=config.RANDOM_STATE, n_init=10)
self.labels = self.model.fit_predict(data_scaled)
return self.model
def get_cluster_results(self, n_clusters=3):
if self.model is None or self.n_clusters != n_clusters:
self.fit(n_clusters)
centers = self.scaler.inverse_transform(self.model.cluster_centers_)
unique, counts = np.unique(self.labels, return_counts=True)
total = len(self.labels)
names = self._generate_cluster_names(centers)
clusters = []
for cluster_id, count in zip(unique, counts):
center = centers[int(cluster_id)]
clusters.append({
'id': int(cluster_id),
'name': names.get(int(cluster_id), f'群体{int(cluster_id) + 1}'),
'member_count': int(count),
'percentage': round(count / total * 100, 1),
'center': {
feature: round(float(value), 2)
for feature, value in zip(self.feature_cols, center)
},
'description': self._generate_description(names.get(int(cluster_id), '')),
})
return {'n_clusters': self.n_clusters, 'clusters': clusters}
def get_cluster_profile(self, n_clusters=3):
if self.model is None or self.n_clusters != n_clusters:
self.fit(n_clusters)
centers_scaled = self.model.cluster_centers_
names = self._generate_cluster_names(self.scaler.inverse_transform(centers_scaled))
return {
'dimensions': ['年龄', '司龄', '加班', '通勤', 'BMI', '缺勤'],
'dimension_keys': self.feature_cols,
'clusters': [
{
'id': idx,
'name': names.get(idx, f'群体{idx + 1}'),
'values': [round(float(v), 2) for v in centers_scaled[idx]],
}
for idx in range(self.n_clusters)
],
}
def get_scatter_data(self, n_clusters=3, x_axis='月均加班时长', y_axis='缺勤时长(小时)'):
if self.model is None or self.n_clusters != n_clusters:
self.fit(n_clusters)
df = get_clean_data().reset_index(drop=True)
centers = self.scaler.inverse_transform(self.model.cluster_centers_)
names = self._generate_cluster_names(centers)
if x_axis not in df.columns:
x_axis = '月均加班时长'
if y_axis not in df.columns:
y_axis = config.TARGET_COLUMN
points = []
for idx in range(min(len(df), len(self.labels))):
row = df.iloc[idx]
points.append({
'employee_id': str(row[config.EMPLOYEE_ID_COLUMN]),
'x': float(row[x_axis]),
'y': float(row[y_axis]),
'cluster_id': int(self.labels[idx]),
})
return {
'x_axis': x_axis,
'x_axis_name': config.FEATURE_NAME_CN.get(x_axis, x_axis),
'y_axis': y_axis,
'y_axis_name': config.FEATURE_NAME_CN.get(y_axis, y_axis),
'points': points[:500],
'cluster_colors': {
'0': '#5B8FF9',
'1': '#61DDAA',
'2': '#F6BD16',
'3': '#E8684A',
'4': '#6DC8EC',
},
'cluster_names': {
str(idx): names.get(idx, f'群体{idx + 1}')
for idx in range(self.n_clusters)
},
}
def _generate_cluster_names(self, centers):
base_names = {}
for idx, center in enumerate(centers):
_, tenure, overtime, commute, bmi, absence = center
if overtime > 38 and commute > 55 and absence > 8:
base_names[idx] = '高压通勤型'
elif bmi > 27 and absence > 8:
base_names[idx] = '健康波动型'
elif tenure > 8 and absence < 6:
base_names[idx] = '稳定低风险型'
elif overtime > 28 and absence > 7:
base_names[idx] = '轮班负荷型'
else:
base_names[idx] = f'群体{idx + 1}'
return self._deduplicate_cluster_names(base_names, centers)
def _deduplicate_cluster_names(self, names, centers):
grouped = {}
for idx, name in names.items():
grouped.setdefault(name, []).append(idx)
deduplicated = names.copy()
for name, indices in grouped.items():
if len(indices) == 1:
continue
order = self._build_duplicate_order(indices, centers)
suffixes = self._suffix_candidates(name)
for rank, idx in enumerate(order):
suffix = suffixes[rank] if rank < len(suffixes) else f'{rank + 1}'
deduplicated[idx] = f'{name}{suffix}'
return deduplicated
def _build_duplicate_order(self, indices, centers):
return sorted(
indices,
key=lambda idx: (
centers[idx][5], # 缺勤时长
centers[idx][2], # 加班
centers[idx][1], # 司龄
centers[idx][3], # 通勤
centers[idx][4], # BMI
centers[idx][0], # 年龄
),
reverse=True,
)
def _suffix_candidates(self, name):
suffix_map = {
'高压通勤型': ['-高风险组', '-关注组', '-观察组'],
'健康波动型': ['-重点关注组', '-预警组', '-观察组'],
'稳定低风险型': ['-资深组', '-成熟组', '-稳健组'],
'轮班负荷型': ['-高负荷组', '-轮班组', '-强化组'],
}
return suffix_map.get(name, [f'{idx}' for idx in range(1, 10)])
def _generate_description(self, name):
descriptions = {
'高压通勤型': '加班和通勤压力都高,缺勤时长偏长。',
'健康波动型': '健康相关风险更高,需要重点关注。',
'稳定低风险型': '司龄较长,缺勤水平稳定且偏低。',
'轮班负荷型': '排班和工作负荷较重,缺勤风险较高。',
}
for key, description in descriptions.items():
if name.startswith(key):
return description
return descriptions.get(name, '常规员工群体。')
kmeans_analyzer = KMeansAnalyzer()