121 lines
4.0 KiB
Python
121 lines
4.0 KiB
Python
import importlib.util
|
|
import sys
|
|
import types
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
|
|
|
|
def load_clustering_module():
|
|
module_path = Path(r'D:\forsetsystem\backend\core\clustering.py')
|
|
|
|
fake_config = types.SimpleNamespace(
|
|
RANDOM_STATE=42,
|
|
TARGET_COLUMN='缺勤时长(小时)',
|
|
EMPLOYEE_ID_COLUMN='员工工号',
|
|
FEATURE_NAME_CN={
|
|
'月均加班时长': '月均加班时长',
|
|
'缺勤时长(小时)': '缺勤时长(小时)',
|
|
},
|
|
)
|
|
fake_preprocessing = types.ModuleType('core.preprocessing')
|
|
fake_preprocessing.get_clean_data = lambda: None
|
|
fake_sklearn = types.ModuleType('sklearn')
|
|
fake_sklearn_cluster = types.ModuleType('sklearn.cluster')
|
|
fake_sklearn_preprocessing = types.ModuleType('sklearn.preprocessing')
|
|
|
|
class DummyKMeans:
|
|
def __init__(self, *args, **kwargs):
|
|
self.cluster_centers_ = None
|
|
|
|
def fit_predict(self, data):
|
|
self.cluster_centers_ = np.asarray(data, dtype=float)
|
|
return np.zeros(len(data), dtype=int)
|
|
|
|
class DummyMinMaxScaler:
|
|
def fit_transform(self, data):
|
|
return np.asarray(data, dtype=float)
|
|
|
|
def inverse_transform(self, data):
|
|
return np.asarray(data, dtype=float)
|
|
|
|
fake_sklearn_cluster.KMeans = DummyKMeans
|
|
fake_sklearn_preprocessing.MinMaxScaler = DummyMinMaxScaler
|
|
|
|
sys.modules['config'] = fake_config
|
|
sys.modules['core.preprocessing'] = fake_preprocessing
|
|
sys.modules['sklearn'] = fake_sklearn
|
|
sys.modules['sklearn.cluster'] = fake_sklearn_cluster
|
|
sys.modules['sklearn.preprocessing'] = fake_sklearn_preprocessing
|
|
|
|
spec = importlib.util.spec_from_file_location('test_clustering_module', module_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
return module
|
|
|
|
|
|
class ClusterNamingTests(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
module = load_clustering_module()
|
|
cls.analyzer = module.KMeansAnalyzer()
|
|
|
|
def test_generate_cluster_names_avoids_generic_group_names(self):
|
|
centers = np.array([
|
|
[41, 11, 18, 28, 22.5, 4.2],
|
|
[30, 3, 22, 33, 23.0, 5.8],
|
|
[36, 7, 36, 52, 24.0, 8.6],
|
|
[38, 6, 24, 31, 27.2, 8.1],
|
|
], dtype=float)
|
|
|
|
names = self.analyzer._generate_cluster_names(centers)
|
|
|
|
self.assertEqual(len(names), 4)
|
|
for name in names.values():
|
|
self.assertNotIn('群体', name)
|
|
|
|
def test_generate_cluster_names_returns_business_labels(self):
|
|
centers = np.array([
|
|
[42, 10, 16, 26, 22.0, 4.1],
|
|
[29, 2, 20, 30, 22.8, 5.6],
|
|
[35, 6, 34, 50, 24.1, 8.8],
|
|
[37, 7, 23, 29, 27.5, 8.0],
|
|
], dtype=float)
|
|
|
|
names = self.analyzer._generate_cluster_names(centers)
|
|
|
|
self.assertIn('稳定成熟型', names.values())
|
|
self.assertIn('新锐成长型', names.values())
|
|
self.assertIn('压力奔波型', names.values())
|
|
self.assertIn('健康关注型', names.values())
|
|
|
|
def test_duplicate_names_receive_natural_suffixes(self):
|
|
centers = np.array([
|
|
[44, 12, 18, 29, 22.2, 4.0],
|
|
[39, 9, 20, 34, 23.1, 5.3],
|
|
[32, 4, 31, 46, 24.8, 7.2],
|
|
], dtype=float)
|
|
|
|
names = self.analyzer._deduplicate_cluster_names(
|
|
{0: '稳定成熟型', 1: '稳定成熟型', 2: '负荷承压型'},
|
|
centers,
|
|
)
|
|
|
|
self.assertEqual({names[0], names[1]}, {'稳定成熟型-资深组', '稳定成熟型-成熟组'})
|
|
self.assertEqual(names[2], '负荷承压型')
|
|
|
|
def test_description_reflects_center_traits(self):
|
|
description = self.analyzer._generate_description(
|
|
'压力奔波型',
|
|
np.array([34, 5, 36, 52, 24.0, 8.3], dtype=float),
|
|
)
|
|
|
|
self.assertIn('加班负荷偏高', description)
|
|
self.assertIn('通勤压力偏高', description)
|
|
self.assertIn('缺勤时长偏高', description)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|