300 lines
10 KiB
Python
300 lines
10 KiB
Python
import os
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
|
|
|
|
import config
|
|
from core.model_features import engineer_features
|
|
|
|
try:
|
|
import torch
|
|
import torch.nn as nn
|
|
from torch.utils.data import DataLoader, TensorDataset
|
|
except ImportError:
|
|
torch = None
|
|
nn = None
|
|
DataLoader = None
|
|
TensorDataset = None
|
|
|
|
|
|
WINDOW_SIZE = 5
|
|
SEQUENCE_FEATURES = [
|
|
'缺勤月份',
|
|
'星期几',
|
|
'是否节假日前后',
|
|
'请假类型',
|
|
'请假原因大类',
|
|
'是否提供医院证明',
|
|
'是否临时请假',
|
|
'是否连续缺勤',
|
|
'前一工作日是否加班',
|
|
'月均加班时长',
|
|
'通勤时长分钟',
|
|
'是否夜班岗位',
|
|
'是否慢性病史',
|
|
'加班通勤压力指数',
|
|
'缺勤历史强度',
|
|
]
|
|
STATIC_FEATURES = [
|
|
'所属行业',
|
|
'婚姻状态',
|
|
'岗位序列',
|
|
'岗位级别',
|
|
'年龄',
|
|
'司龄年数',
|
|
'子女数量',
|
|
'班次类型',
|
|
'绩效等级',
|
|
'BMI',
|
|
'健康风险指数',
|
|
'家庭负担指数',
|
|
'岗位稳定性指数',
|
|
]
|
|
|
|
|
|
class LSTMMLPRegressor(nn.Module):
|
|
def __init__(self, seq_input_dim: int, static_input_dim: int):
|
|
super().__init__()
|
|
self.lstm = nn.LSTM(
|
|
input_size=seq_input_dim,
|
|
hidden_size=48,
|
|
num_layers=1,
|
|
batch_first=True,
|
|
dropout=0.0,
|
|
)
|
|
self.static_net = nn.Sequential(
|
|
nn.Linear(static_input_dim, 32),
|
|
nn.ReLU(),
|
|
nn.Dropout(0.1),
|
|
)
|
|
self.fusion = nn.Sequential(
|
|
nn.Linear(48 + 32, 48),
|
|
nn.ReLU(),
|
|
nn.Dropout(0.1),
|
|
nn.Linear(48, 1),
|
|
)
|
|
|
|
def forward(self, sequence_x, static_x):
|
|
lstm_output, _ = self.lstm(sequence_x)
|
|
sequence_repr = lstm_output[:, -1, :]
|
|
static_repr = self.static_net(static_x)
|
|
fused = torch.cat([sequence_repr, static_repr], dim=1)
|
|
return self.fusion(fused).squeeze(1)
|
|
|
|
|
|
def is_available() -> bool:
|
|
return torch is not None
|
|
|
|
|
|
def _fit_category_maps(df: pd.DataFrame, features: List[str]) -> Dict[str, Dict[str, int]]:
|
|
category_maps = {}
|
|
for feature in features:
|
|
if feature not in df.columns:
|
|
continue
|
|
if pd.api.types.is_numeric_dtype(df[feature]):
|
|
continue
|
|
values = sorted(df[feature].astype(str).unique().tolist())
|
|
category_maps[feature] = {value: idx for idx, value in enumerate(values)}
|
|
return category_maps
|
|
|
|
|
|
def _apply_category_maps(df: pd.DataFrame, features: List[str], category_maps: Dict[str, Dict[str, int]]) -> pd.DataFrame:
|
|
encoded = df.copy()
|
|
for feature in features:
|
|
if feature not in encoded.columns:
|
|
encoded[feature] = 0
|
|
continue
|
|
if feature in category_maps:
|
|
mapper = category_maps[feature]
|
|
encoded[feature] = encoded[feature].astype(str).map(lambda value: mapper.get(value, 0))
|
|
return encoded
|
|
|
|
|
|
def _safe_standardize(values: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
|
|
mean = values.mean(axis=0)
|
|
std = values.std(axis=0)
|
|
std = np.where(std < 1e-6, 1.0, std)
|
|
return mean.astype(np.float32), std.astype(np.float32)
|
|
|
|
|
|
def _build_sequence_arrays(
|
|
df: pd.DataFrame,
|
|
category_maps: Dict[str, Dict[str, int]],
|
|
target_transform: str,
|
|
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
|
|
df = engineer_features(df.copy())
|
|
features = sorted(set(SEQUENCE_FEATURES + STATIC_FEATURES))
|
|
df = _apply_category_maps(df, features, category_maps)
|
|
df = df.sort_values(
|
|
[config.EMPLOYEE_ID_COLUMN, config.EVENT_DATE_INDEX_COLUMN, config.EVENT_SEQUENCE_COLUMN]
|
|
).reset_index(drop=True)
|
|
|
|
sequence_samples = []
|
|
static_samples = []
|
|
targets = []
|
|
|
|
for _, group in df.groupby(config.EMPLOYEE_ID_COLUMN, sort=False):
|
|
seq_values = group[SEQUENCE_FEATURES].astype(float).values
|
|
static_values = group[STATIC_FEATURES].astype(float).values
|
|
target_values = group[config.TARGET_COLUMN].astype(float).values
|
|
|
|
for index in range(len(group)):
|
|
window_slice = seq_values[max(0, index - WINDOW_SIZE + 1): index + 1]
|
|
sequence_window = np.zeros((WINDOW_SIZE, len(SEQUENCE_FEATURES)), dtype=np.float32)
|
|
sequence_window[-len(window_slice):] = window_slice
|
|
sequence_samples.append(sequence_window)
|
|
static_samples.append(static_values[index].astype(np.float32))
|
|
targets.append(float(target_values[index]))
|
|
|
|
targets = np.array(targets, dtype=np.float32)
|
|
if target_transform == 'log1p':
|
|
targets = np.log1p(np.clip(targets, a_min=0, a_max=None)).astype(np.float32)
|
|
|
|
return (
|
|
np.array(sequence_samples, dtype=np.float32),
|
|
np.array(static_samples, dtype=np.float32),
|
|
targets,
|
|
)
|
|
|
|
|
|
def train_lstm_mlp(
|
|
train_df: pd.DataFrame,
|
|
test_df: pd.DataFrame,
|
|
model_path: str,
|
|
target_transform: str = 'log1p',
|
|
epochs: int = 24,
|
|
batch_size: int = 128,
|
|
) -> Optional[Dict]:
|
|
if torch is None:
|
|
return None
|
|
|
|
used_features = sorted(set(SEQUENCE_FEATURES + STATIC_FEATURES))
|
|
category_maps = _fit_category_maps(train_df, used_features)
|
|
train_seq, train_static, y_train = _build_sequence_arrays(train_df, category_maps, target_transform)
|
|
test_seq, test_static, y_test_transformed = _build_sequence_arrays(test_df, category_maps, target_transform)
|
|
|
|
seq_mean, seq_std = _safe_standardize(train_seq.reshape(-1, train_seq.shape[-1]))
|
|
static_mean, static_std = _safe_standardize(train_static)
|
|
|
|
train_seq = ((train_seq - seq_mean) / seq_std).astype(np.float32)
|
|
test_seq = ((test_seq - seq_mean) / seq_std).astype(np.float32)
|
|
train_static = ((train_static - static_mean) / static_std).astype(np.float32)
|
|
test_static = ((test_static - static_mean) / static_std).astype(np.float32)
|
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
if device.type == 'cuda':
|
|
device_name = torch.cuda.get_device_name(device)
|
|
print(f'[lstm_mlp] Training device: CUDA ({device_name})')
|
|
else:
|
|
print('[lstm_mlp] Training device: CPU')
|
|
model = LSTMMLPRegressor(seq_input_dim=train_seq.shape[-1], static_input_dim=train_static.shape[-1]).to(device)
|
|
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
|
|
criterion = nn.MSELoss()
|
|
|
|
train_dataset = TensorDataset(
|
|
torch.tensor(train_seq),
|
|
torch.tensor(train_static),
|
|
torch.tensor(y_train),
|
|
)
|
|
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
|
|
|
|
model.train()
|
|
for _ in range(epochs):
|
|
for batch_seq, batch_static, batch_target in train_loader:
|
|
batch_seq = batch_seq.to(device)
|
|
batch_static = batch_static.to(device)
|
|
batch_target = batch_target.to(device)
|
|
|
|
optimizer.zero_grad()
|
|
predictions = model(batch_seq, batch_static)
|
|
loss = criterion(predictions, batch_target)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
model.eval()
|
|
with torch.no_grad():
|
|
predictions = model(
|
|
torch.tensor(test_seq).to(device),
|
|
torch.tensor(test_static).to(device),
|
|
).cpu().numpy()
|
|
|
|
if target_transform == 'log1p':
|
|
y_pred = np.expm1(predictions)
|
|
else:
|
|
y_pred = predictions
|
|
y_true = test_df[config.TARGET_COLUMN].astype(float).values
|
|
y_pred = np.clip(y_pred, a_min=0, a_max=None)
|
|
mse = mean_squared_error(y_true, y_pred)
|
|
|
|
default_prefix = train_seq[:, :-1, :].mean(axis=0).astype(np.float32)
|
|
bundle = {
|
|
'state_dict': model.state_dict(),
|
|
'sequence_features': SEQUENCE_FEATURES,
|
|
'static_features': STATIC_FEATURES,
|
|
'category_maps': category_maps,
|
|
'seq_mean': seq_mean,
|
|
'seq_std': seq_std,
|
|
'static_mean': static_mean,
|
|
'static_std': static_std,
|
|
'default_sequence_prefix': default_prefix,
|
|
'window_size': WINDOW_SIZE,
|
|
'target_transform': target_transform,
|
|
'sequence_input_dim': train_seq.shape[-1],
|
|
'static_input_dim': train_static.shape[-1],
|
|
}
|
|
torch.save(bundle, model_path)
|
|
|
|
return {
|
|
'metrics': {
|
|
'r2': round(r2_score(y_true, y_pred), 4),
|
|
'mse': round(mse, 4),
|
|
'rmse': round(float(np.sqrt(mse)), 4),
|
|
'mae': round(mean_absolute_error(y_true, y_pred), 4),
|
|
},
|
|
'metadata': {
|
|
'sequence_window_size': WINDOW_SIZE,
|
|
'sequence_feature_names': SEQUENCE_FEATURES,
|
|
'static_feature_names': STATIC_FEATURES,
|
|
},
|
|
}
|
|
|
|
|
|
def load_lstm_mlp_bundle(model_path: str) -> Optional[Dict]:
|
|
if torch is None or not os.path.exists(model_path):
|
|
return None
|
|
bundle = torch.load(model_path, map_location='cpu')
|
|
model = LSTMMLPRegressor(
|
|
seq_input_dim=bundle['sequence_input_dim'],
|
|
static_input_dim=bundle['static_input_dim'],
|
|
)
|
|
model.load_state_dict(bundle['state_dict'])
|
|
model.eval()
|
|
bundle['model'] = model
|
|
return bundle
|
|
|
|
|
|
def predict_lstm_mlp(bundle: Dict, current_df: pd.DataFrame) -> float:
|
|
df = engineer_features(current_df.copy())
|
|
used_features = sorted(set(bundle['sequence_features'] + bundle['static_features']))
|
|
df = _apply_category_maps(df, used_features, bundle['category_maps'])
|
|
|
|
sequence_row = df[bundle['sequence_features']].astype(float).values[0].astype(np.float32)
|
|
static_row = df[bundle['static_features']].astype(float).values[0].astype(np.float32)
|
|
|
|
prefix = bundle['default_sequence_prefix']
|
|
sequence_window = np.vstack([prefix, sequence_row.reshape(1, -1)]).astype(np.float32)
|
|
sequence_window = (sequence_window - bundle['seq_mean']) / bundle['seq_std']
|
|
static_row = ((static_row - bundle['static_mean']) / bundle['static_std']).astype(np.float32)
|
|
|
|
with torch.no_grad():
|
|
prediction = bundle['model'](
|
|
torch.tensor(sequence_window).unsqueeze(0),
|
|
torch.tensor(static_row).unsqueeze(0),
|
|
).cpu().numpy()[0]
|
|
|
|
if bundle.get('target_transform') == 'log1p':
|
|
prediction = np.expm1(prediction)
|
|
return float(max(0.5, prediction))
|