123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- # -*- coding: utf-8 -*-
- """
- @author: yq
- @time: 2025/2/27
- @desc:
- """
- import json
- import math
- import os
- import re
- from os.path import dirname, realpath
- from typing import Dict, List
- import matplotlib.pyplot as plt
- import numpy as np
- import pandas as pd
- import scorecardpy as sc
- import torch
- import torch.nn as nn
- import torch.optim as optim
- from tqdm import tqdm
- from commom import GeneralException, f_image_crop_white_borders, f_df_to_image, f_display_title, \
- f_display_images_by_side
- from entitys import DataSplitEntity, OnlineLearningConfigEntity, MetricFucResultEntity
- from enums import ResultCodesEnum, ConstantEnum, ContextEnum, FileEnum
- from feature import f_woebin_load
- from init import init, context
- from model import f_get_model_score_bin, f_calcu_model_ks, f_stress_test, f_calcu_model_psi
- from monitor import ReportWord
- from .utils import LR
- init()
- class OnlineLearningTrainer:
- def __init__(self, data: DataSplitEntity = None, ol_config: OnlineLearningConfigEntity = None, *args, **kwargs):
- if ol_config is not None:
- self._ol_config = ol_config
- else:
- self._ol_config = OnlineLearningConfigEntity(*args, **kwargs)
- self._data = data
- self._columns = None
- self._model_original: LR
- self._model_optimized: LR
- self.sc_woebin = None
- self.card_cfg = None
- self.card = None
- # 报告模板
- self._template_path = os.path.join(dirname(dirname(realpath(__file__))),
- "./template/OnlineLearning报告模板_lr.docx")
- self._init(self._ol_config.path_resources)
- def _init(self, path: str):
- if not os.path.isdir(path):
- raise GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"【{path}】不是文件夹")
- path_coef = os.path.join(path, FileEnum.COEF.value)
- if not os.path.isfile(path_coef):
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型系数文件【{path_coef}】不存在")
- with open(path_coef, mode="r", encoding="utf-8") as f:
- coef = json.loads(f.read())
- print(f"coef load from【{path_coef}】success.")
- path_card_cfg = os.path.join(path, FileEnum.CARD_CFG.value)
- if os.path.isfile(path_card_cfg):
- with open(path_card_cfg, mode="r", encoding="utf-8") as f:
- self.card_cfg = json.loads(f.read())
- print(f"{FileEnum.CARD_CFG.value} load from【{path_card_cfg}】success.")
- self._columns = list(coef.keys())
- # 排个序,防止因为顺序原因导致的可能的bug
- self._columns.sort()
- weight = [coef[k] for k in self._columns]
- self._model_original = LR(nn.Parameter(torch.tensor(np.array(weight))))
- self._model_optimized = LR(nn.Parameter(torch.tensor(np.array(weight))))
- self._columns = [re.sub('_woe$', '', i) for i in self._columns]
- # 剔除常数项,因为woe编码里没有常数项
- self._columns_intercept_remove = self._columns.copy()
- if ConstantEnum.INTERCEPT.value in self._columns_intercept_remove:
- self._columns_intercept_remove.remove(ConstantEnum.INTERCEPT.value)
- # woe编码后带_woe后缀
- self._columns_woe = [f"{i}_woe" for i in self._columns]
- self.sc_woebin = f_woebin_load(path)
- for k in self._columns_intercept_remove:
- if k not in self.sc_woebin.keys():
- GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"模型变量【{k}】在woe特征里不存在")
- def _feature_generate(self, data: pd.DataFrame) -> pd.DataFrame:
- data_woe = sc.woebin_ply(data[self._columns_intercept_remove], self.sc_woebin, print_info=False)
- data_woe[f"{ConstantEnum.INTERCEPT.value}_woe"] = [1] * len(data_woe)
- return data_woe[self._columns_woe].to_numpy()
- def _f_get_best_model(self, df_param: pd.DataFrame, epoch: int = None) -> LR:
- if epoch is None:
- df_param_sort = df_param.sort_values(by=["ks_test", "auc_test"], ascending=[False, False])
- print(f"选择最佳参数:\n{df_param_sort.iloc[0].to_dict()}")
- weight = list(df_param_sort.iloc[0])
- else:
- print(f"选择epoch:【{epoch}】的参数:\n{df_param[df_param['epoch'] == epoch].iloc[0].to_dict()}")
- weight = list(df_param[df_param["epoch"] == epoch].iloc[0])
- weight = nn.Parameter(torch.tensor(np.array(weight[0:-5])))
- return LR(weight)
- def _f_get_scorecard(self, ):
- class M:
- def __init__(self, ):
- pass
- m = M()
- m.coef_ = [self._model_optimized.linear.weight.tolist()]
- m.intercept_ = [0]
- self.card = sc.scorecard(self.sc_woebin, m, self._columns_woe, **self.card_cfg)
- def _f_get_metric_auc_ks(self, model_type: str):
- def _get_auc_ks(data, title):
- y = data[self._ol_config.y_column]
- y_prob = self.prob(data, model)
- perf = sc.perf_eva(y, y_prob, title=f"{title}", show_plot=True)
- path = self._ol_config.f_get_save_path(f"perf_{title}.png")
- perf["pic"].savefig(path)
- auc = perf["AUC"]
- ks = perf["KS"]
- f_image_crop_white_borders(path, path)
- return auc, ks, path
- train_data = self._data.train_data
- test_data = self._data.test_data
- data = self._data.data
- model = self._model_optimized
- if model_type != "新模型":
- model = self._model_original
- img_path_auc_ks = []
- auc, ks, path = _get_auc_ks(data, f"{model_type}-建模数据")
- img_path_auc_ks.append(path)
- train_auc, train_ks, path = _get_auc_ks(train_data, f"{model_type}-训练集")
- img_path_auc_ks.append(path)
- test_auc, test_ks, path = _get_auc_ks(test_data, f"{model_type}-测试集")
- img_path_auc_ks.append(path)
- df_auc_ks = pd.DataFrame()
- df_auc_ks["样本集"] = ["建模数据", "训练集", "测试集"]
- df_auc_ks["AUC"] = [auc, train_auc, test_auc]
- df_auc_ks["KS"] = [ks, train_ks, test_ks]
- return MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks, image_size=5, table_font_size=10)
- def _f_get_metric_trend(self, ):
- y_column = self._ol_config.y_column
- data = self._data.data
- # 建模样本变量趋势
- breaks_list = {}
- special_values = {}
- for column, bin in self.sc_woebin.items():
- breaks_list[column] = list(bin[bin["is_special_values"] == False]['breaks'])
- sv = list(bin[bin["is_special_values"] == True]['breaks'])
- if len(sv) > 0:
- special_values[column] = sv
- woebin = sc.woebin(data[self._columns_intercept_remove + [y_column]], y=y_column, breaks_list=breaks_list,
- special_values=special_values, print_info=False)
- imgs_path = []
- for k, df_bin in woebin.items():
- sc.woebin_plot(df_bin)
- path = self._ol_config.f_get_save_path(f"trend_{k}.png")
- plt.savefig(path)
- imgs_path.append(path)
- return MetricFucResultEntity(image_path=imgs_path, image_size=4)
- def _f_get_metric_coef(self, ):
- columns_anns = self._ol_config.columns_anns
- df = pd.DataFrame()
- df["变量"] = self._columns
- df["原变量WOE拟合系数"] = [round(i, 4) for i in self._model_original.linear.weight.tolist()]
- df["新变量WOE拟合系数"] = [round(i, 4) for i in self._model_optimized.linear.weight.tolist()]
- anns = [columns_anns.get(column, "-") for column in self._columns]
- df["释义"] = anns
- img_path_coef = self._ol_config.f_get_save_path(f"coef.png")
- f_df_to_image(df, img_path_coef)
- return MetricFucResultEntity(table=df, image_path=img_path_coef)
- def _f_get_metric_gain(self, model_type: str):
- y_column = self._ol_config.y_column
- data = self._data.data
- model = self._model_optimized
- if model_type != "新模型":
- model = self._model_original
- score = self.prob(data, model)
- score_bin, _ = f_get_model_score_bin(data, score)
- gain = f_calcu_model_ks(score_bin, y_column, sort_ascending=False)
- img_path_gain = self._ol_config.f_get_save_path(f"{model_type}-gain.png")
- f_df_to_image(gain, img_path_gain)
- return MetricFucResultEntity(table=gain, image_path=img_path_gain)
- def _f_get_stress_test(self, ):
- stress_sample_times = self._ol_config.stress_sample_times
- stress_bad_rate_list = self._ol_config.stress_bad_rate_list
- y_column = self._ol_config.y_column
- data = self._data.data
- score = self.prob(data, self._model_optimized)
- score_bin, _ = f_get_model_score_bin(data, score)
- df_stress = f_stress_test(score_bin, sample_times=stress_sample_times, bad_rate_list=stress_bad_rate_list,
- target_column=y_column, score_column=ConstantEnum.SCORE.value, sort_ascending=False)
- img_path_stress = self._ol_config.f_get_save_path(f"stress.png")
- f_df_to_image(df_stress, img_path_stress)
- return MetricFucResultEntity(table=df_stress, image_path=img_path_stress)
- def prob(self, x: pd.DataFrame, model=None):
- if model is None:
- model = self._model_optimized
- model.eval()
- with torch.no_grad():
- x = torch.tensor(self._feature_generate(x), dtype=torch.float64)
- y_prob = model(x)
- y_prob = y_prob.detach().numpy()
- return y_prob
- def score(self, x: pd.DataFrame) -> np.array:
- return np.array(sc.scorecard_ply(x, self.card, print_step=0)["score"])
- def psi(self, x1: pd.DataFrame, x2: pd.DataFrame, points: List[float] = None) -> pd.DataFrame:
- y1 = self.prob(x1)
- y2 = self.prob(x2)
- x1_score_bin, score_bins = f_get_model_score_bin(x1, y1, points)
- x2_score_bin, _ = f_get_model_score_bin(x2, y2, score_bins)
- model_psi = f_calcu_model_psi(x1_score_bin, x2_score_bin, sort_ascending=False)
- print(f"模型psi: {model_psi['psi'].sum()}")
- return model_psi
- def train(self, ):
- def _get_param_optimized(model: LR, epoch):
- model.eval()
- with torch.no_grad():
- y_prob = model(test_x)
- loss = criterion(y_prob, torch.tensor(test_y.to_numpy(), dtype=torch.float64))
- loss_test = loss.detach().item()
- y_prob = y_prob.detach().numpy()
- perf = sc.perf_eva(test_y, y_prob, show_plot=False)
- auc = perf["AUC"]
- ks = perf["KS"]
- row = model.linear.weight.tolist() + [auc, ks, epoch + 1, loss_train, loss_test]
- return dict(zip(df_param_columns, row))
- epochs = self._ol_config.epochs
- batch_size = self._ol_config.batch_size
- train_data = self._data.train_data
- test_data = self._data.test_data
- train_x = self._feature_generate(train_data)
- train_y = train_data[self._ol_config.y_column].to_numpy()
- test_x = torch.tensor(self._feature_generate(test_data), dtype=torch.float64)
- test_y = test_data[self._ol_config.y_column]
- criterion = nn.BCELoss()
- optimizer = optim.Adam(self._model_optimized.parameters(), lr=self._ol_config.lr)
- df_param_columns = self._columns + ["auc_test", "ks_test", "epoch", "loss_train", "loss_test"]
- df_param = pd.DataFrame(columns=df_param_columns)
- # 优化前
- loss_train = 0
- df_param.loc[len(df_param)] = _get_param_optimized(self._model_original, -1)
- for epoch in tqdm(range(epochs)):
- data_len = len(train_x)
- for i in range(math.ceil(data_len / batch_size)):
- train_x_batch = torch.tensor(train_x[i * batch_size:(i + 1) * batch_size], dtype=torch.float64)
- train_y_batch = torch.tensor(train_y[i * batch_size:(i + 1) * batch_size], dtype=torch.float64)
- self._model_optimized.train()
- optimizer.zero_grad()
- y_prob = self._model_optimized(train_x_batch)
- loss = criterion(y_prob, train_y_batch)
- loss.backward()
- optimizer.step()
- loss_train = loss.detach().item()
- # 测试集评估
- df_param.loc[len(df_param)] = _get_param_optimized(self._model_optimized, epoch)
- context.set(ContextEnum.PARAM_OPTIMIZED, df_param)
- def save(self):
- self._ol_config.config_save()
- if self.sc_woebin is None:
- GeneralException(ResultCodesEnum.NOT_FOUND, message=f"feature不存在")
- df_woebin = pd.concat(self.sc_woebin.values())
- path = self._ol_config.f_get_save_path(FileEnum.FEATURE.value)
- df_woebin.to_csv(path)
- print(f"feature save to【{path}】success. ")
- if self._model_optimized is None:
- GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在")
- path = self._ol_config.f_get_save_path(FileEnum.COEF.value)
- with open(path, mode="w", encoding="utf-8") as f:
- coef = dict(zip(self._columns, self._model_optimized.linear.weight.tolist()))
- j = json.dumps(coef, ensure_ascii=False)
- f.write(j)
- print(f"model save to【{path}】success. ")
- if self.card is not None:
- df_card = pd.concat(self.card.values())
- path = self._ol_config.f_get_save_path(FileEnum.CARD.value)
- df_card.to_csv(path)
- print(f"model save to【{path}】success. ")
- @staticmethod
- def load(path: str):
- ol_config = OnlineLearningConfigEntity.from_config(path)
- ol_config._path_resources = path
- return OnlineLearningTrainer(ol_config=ol_config)
- def report(self, epoch: int = None):
- df_param = context.get(ContextEnum.PARAM_OPTIMIZED)
- self._model_optimized = self._f_get_best_model(df_param, epoch)
- if self._ol_config.jupyter_print:
- from IPython import display
- f_display_title(display, "模型系数优化过程")
- display.display(df_param)
- metric_value_dict = {}
- # 评分卡
- if not self.card_cfg is None:
- self._f_get_scorecard()
- df_card = pd.concat(self.card.values())
- img_path_card = self._ol_config.f_get_save_path(f"card.png")
- f_df_to_image(df_card, img_path_card)
- metric_value_dict["评分卡"] = MetricFucResultEntity(table=df_card, image_path=img_path_card)
- # 样本分布
- metric_value_dict["样本分布"] = MetricFucResultEntity(table=self._data.get_distribution(self._ol_config.y_column),
- table_font_size=10, table_cell_width=3)
- # 模型结果对比
- metric_value_dict[f"模型结果-新模型"] = self._f_get_metric_auc_ks("新模型")
- metric_value_dict[f"模型结果-原模型"] = self._f_get_metric_auc_ks("原模型")
- # 变量趋势
- metric_value_dict["变量趋势-建模数据"] = self._f_get_metric_trend()
- # 模型系数对比
- metric_value_dict["模型系数"] = self._f_get_metric_coef()
- # 分数分箱
- metric_value_dict["分数分箱-建模数据-新模型"] = self._f_get_metric_gain("新模型")
- metric_value_dict["分数分箱-建模数据-原模型"] = self._f_get_metric_gain("原模型")
- # 压力测试
- if self._ol_config.stress_test:
- metric_value_dict["压力测试"] = self._f_get_stress_test()
- if self._ol_config.jupyter_print:
- self.jupyter_print(metric_value_dict)
- save_path = self._ol_config.f_get_save_path("OnlineLearning报告.docx")
- ReportWord.generate_report(metric_value_dict, self._template_path, save_path=save_path)
- print(f"模型报告文件储存路径:{save_path}")
- def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity]):
- from IPython import display
- f_display_title(display, "样本分布")
- display.display(metric_value_dict["样本分布"].table)
- f_display_title(display, "模型结果")
- print(f"原模型")
- display.display(metric_value_dict["模型结果-原模型"].table)
- f_display_images_by_side(display, metric_value_dict["模型结果-原模型"].image_path)
- print(f"新模型")
- display.display(metric_value_dict["模型结果-新模型"].table)
- f_display_images_by_side(display, metric_value_dict["模型结果-新模型"].image_path)
- f_display_title(display, "模型系数")
- display.display(metric_value_dict["模型系数"].table)
- f_display_title(display, "分数分箱")
- print(f"建模数据上分数分箱")
- print(f"原模型")
- display.display(metric_value_dict["分数分箱-建模数据-原模型"].table)
- print(f"新模型")
- display.display(metric_value_dict["分数分箱-建模数据-新模型"].table)
- f_display_title(display, "变量趋势")
- print(f"建模数据上变量趋势")
- f_display_images_by_side(display, metric_value_dict["变量趋势-建模数据"].image_path)
- if "压力测试" in metric_value_dict.keys():
- f_display_title(display, "压力测试")
- display.display(metric_value_dict["压力测试"].table)
- # 评分卡
- if "评分卡" in metric_value_dict.keys():
- f_display_title(display, "评分卡")
- display.display(metric_value_dict["评分卡"].table)
- if __name__ == "__main__":
- pass
|