trainer_xgb.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2025/2/27
  5. @desc:
  6. """
  7. import os
  8. from os.path import dirname, realpath
  9. from typing import Dict, List
  10. import joblib
  11. import pandas as pd
  12. import scorecardpy as sc
  13. import xgboost as xgb
  14. from pandas.core.dtypes.common import is_numeric_dtype
  15. from pypmml import Model
  16. from sklearn.preprocessing import OneHotEncoder
  17. from sklearn2pmml import PMMLPipeline, sklearn2pmml
  18. from sklearn_pandas import DataFrameMapper
  19. from tqdm import tqdm
  20. from commom import GeneralException, f_image_crop_white_borders, f_df_to_image, f_display_title, \
  21. f_display_images_by_side, silent_print, df_print_nolimit
  22. from entitys import DataSplitEntity, OnlineLearningConfigEntity, MetricFucResultEntity
  23. from enums import ResultCodesEnum, ConstantEnum, FileEnum
  24. from init import init
  25. from model import f_get_model_score_bin, f_calcu_model_ks, f_stress_test, f_calcu_model_psi, Xtransformer_fit, \
  26. Xtransform, fit
  27. init()
  28. class OnlineLearningTrainerXgb:
  29. def __init__(self, data: DataSplitEntity = None, ol_config: OnlineLearningConfigEntity = None, *args, **kwargs):
  30. # 覆写方法
  31. PMMLPipeline.Xtransformer_fit = Xtransformer_fit
  32. PMMLPipeline.Xtransform = Xtransform
  33. PMMLPipeline.fit = fit
  34. if ol_config is not None:
  35. self._ol_config = ol_config
  36. else:
  37. self._ol_config = OnlineLearningConfigEntity(*args, **kwargs)
  38. self._data = data
  39. self._df_param_optimized = None
  40. self._model_optimized_list = []
  41. self._pipeline_original: PMMLPipeline
  42. self._pipeline_optimized: PMMLPipeline
  43. self.model_optimized: xgb.XGBClassifier
  44. # 报告模板
  45. self._template_path = os.path.join(dirname(dirname(realpath(__file__))),
  46. "./template/OnlineLearning报告模板_xgb.docx")
  47. self._init(self._ol_config.path_resources)
  48. def _init(self, path: str):
  49. if not os.path.isdir(path):
  50. raise GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"【{path}】不是文件夹")
  51. path_model = os.path.join(path, FileEnum.PIPELINE_XGB.value)
  52. if not os.path.isfile(path_model):
  53. raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在")
  54. self._pipeline_original = joblib.load(path_model)
  55. self._pipeline_optimized = joblib.load(path_model)
  56. print(f"pipeline load from【{path_model}】success.")
  57. path_model = os.path.join(path, FileEnum.MODEL_XGB.value)
  58. if os.path.isfile(path_model):
  59. model = xgb.XGBClassifier()
  60. model.load_model(path_model)
  61. self._pipeline_optimized.steps[-1] = ("classifier", model)
  62. print(f"model load from【{path_model}】success.")
  63. def _f_rewrite_pmml(self, path_pmml: str):
  64. with open(path_pmml, mode="r", encoding="utf-8") as f:
  65. pmml = f.read()
  66. pmml = pmml.replace('optype="categorical" dataType="double"', 'optype="categorical" dataType="string"')
  67. with open(path_pmml, mode="w", encoding="utf-8") as f:
  68. f.write(pmml)
  69. f.flush()
  70. def _f_get_best_model(self, df_param: pd.DataFrame, ntree: int = None):
  71. if ntree is None:
  72. df_param_sort = df_param.sort_values(by=["ks_test", "auc_test"], ascending=[False, False])
  73. print(f"选择最佳参数:\n{df_param_sort.iloc[0].to_dict()}")
  74. self._train(int(df_param_sort.iloc[0][5]))
  75. else:
  76. print(f"选择ntree:【{ntree}】的参数:\n{df_param[df_param['ntree'] == ntree].iloc[0].to_dict()}")
  77. self._train(ntree)
  78. if self._ol_config.save_pmml:
  79. data = self._data.data
  80. path_pmml = self._ol_config.f_get_save_path(FileEnum.PMML_XGB.value)
  81. # pipeline = make_pmml_pipeline(self.model)
  82. sklearn2pmml(self._pipeline_optimized, path_pmml, with_repr=True, )
  83. self._f_rewrite_pmml(path_pmml)
  84. print(f"pmml save to【{path_pmml}】success. ")
  85. # pmml与原生模型结果一致性校验
  86. model_pmml = Model.fromFile(path_pmml)
  87. prob_pmml = model_pmml.predict(data)["probability(1)"]
  88. with silent_print():
  89. prob_pipeline = self._pipeline_optimized.predict_proba(data)[:, 1]
  90. diff = pd.DataFrame()
  91. diff["prob_pmml"] = prob_pmml
  92. diff["prob_pipeline"] = prob_pipeline
  93. diff["diff"] = diff["prob_pmml"] - diff["prob_pipeline"]
  94. diff["diff_format"] = diff["diff"].apply(lambda x: 1 if abs(x) < 0.001 else 0)
  95. print(f"pmml模型结果一致率(误差小于0.001):{(diff['diff_format'].sum() / len(diff)).round(3) * 100}%")
  96. def _f_get_metric_auc_ks(self, model_type: str):
  97. def _get_auc_ks(data, title):
  98. y = data[self._ol_config.y_column]
  99. y_prob = self.prob(data, model)
  100. perf = sc.perf_eva(y, y_prob, title=f"{title}", show_plot=True)
  101. path = self._ol_config.f_get_save_path(f"perf_{title}.png")
  102. perf["pic"].savefig(path)
  103. auc = perf["AUC"]
  104. ks = perf["KS"]
  105. f_image_crop_white_borders(path, path)
  106. return auc, ks, path
  107. train_data = self._data.train_data
  108. test_data = self._data.test_data
  109. data = self._data.data
  110. model = self._pipeline_optimized
  111. if model_type != "新模型":
  112. model = self._pipeline_original
  113. img_path_auc_ks = []
  114. auc, ks, path = _get_auc_ks(data, f"{model_type}-建模数据")
  115. img_path_auc_ks.append(path)
  116. train_auc, train_ks, path = _get_auc_ks(train_data, f"{model_type}-训练集")
  117. img_path_auc_ks.append(path)
  118. test_auc, test_ks, path = _get_auc_ks(test_data, f"{model_type}-测试集")
  119. img_path_auc_ks.append(path)
  120. df_auc_ks = pd.DataFrame()
  121. df_auc_ks["样本集"] = ["建模数据", "训练集", "测试集"]
  122. df_auc_ks["AUC"] = [auc, train_auc, test_auc]
  123. df_auc_ks["KS"] = [ks, train_ks, test_ks]
  124. return MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks, image_size=5, table_font_size=10)
  125. def _f_get_metric_gain(self, model_type: str):
  126. y_column = self._ol_config.y_column
  127. data = self._data.data
  128. model = self._pipeline_optimized
  129. if model_type != "新模型":
  130. model = self._pipeline_original
  131. score = self.prob(data, model)
  132. score_bin, _ = f_get_model_score_bin(data, score)
  133. gain = f_calcu_model_ks(score_bin, y_column, sort_ascending=False)
  134. img_path_gain = self._ol_config.f_get_save_path(f"{model_type}-gain.png")
  135. f_df_to_image(gain, img_path_gain)
  136. return MetricFucResultEntity(table=gain, image_path=img_path_gain)
  137. def _f_get_stress_test(self, ):
  138. stress_sample_times = self._ol_config.stress_sample_times
  139. stress_bad_rate_list = self._ol_config.stress_bad_rate_list
  140. y_column = self._ol_config.y_column
  141. data = self._data.data
  142. score = self.prob(data, self._pipeline_optimized)
  143. score_bin, _ = f_get_model_score_bin(data, score)
  144. df_stress = f_stress_test(score_bin, sample_times=stress_sample_times, bad_rate_list=stress_bad_rate_list,
  145. target_column=y_column, score_column=ConstantEnum.SCORE.value, sort_ascending=False)
  146. img_path_stress = self._ol_config.f_get_save_path(f"stress.png")
  147. f_df_to_image(df_stress, img_path_stress)
  148. return MetricFucResultEntity(table=df_stress, image_path=img_path_stress)
  149. def prob(self, x: pd.DataFrame, pipeline=None, ntree_limit=None):
  150. if pipeline is None:
  151. pipeline = self._pipeline_optimized
  152. with silent_print():
  153. y_prob = pipeline.predict_proba(x, ntree_limit=ntree_limit)[:, 1]
  154. return y_prob
  155. def psi(self, x1: pd.DataFrame, x2: pd.DataFrame, points: List[float] = None, print_sum=True,
  156. ntree_limit=None) -> pd.DataFrame:
  157. y1 = self.prob(x1, ntree_limit=ntree_limit)
  158. y2 = self.prob(x2, ntree_limit=ntree_limit)
  159. x1_score_bin, score_bins = f_get_model_score_bin(x1, y1, points)
  160. x2_score_bin, _ = f_get_model_score_bin(x2, y2, score_bins)
  161. model_psi = f_calcu_model_psi(x1_score_bin, x2_score_bin, sort_ascending=False)
  162. if print_sum:
  163. print(f"模型psi: {model_psi['psi'].sum()}")
  164. return model_psi
  165. def _train(self, n_estimators: int = None):
  166. y_column = self._ol_config.y_column
  167. train_data = self._data.train_data
  168. params_xgb = self._ol_config.params_xgb
  169. model_original: xgb.XGBClassifier = self._pipeline_original.steps[-1][1]
  170. ntree = model_original.n_estimators if model_original.best_ntree_limit is None else model_original.best_ntree_limit
  171. if params_xgb.get("oltype") == "tree_refresh":
  172. self.model_optimized = xgb.XGBClassifier(
  173. n_estimators=n_estimators if n_estimators else ntree,
  174. reg_alpha=params_xgb.get("alpha"),
  175. reg_lambda=params_xgb.get("lambda"),
  176. importance_type='weight',
  177. updater="refresh",
  178. process_type="update",
  179. refresh_leaf=True,
  180. **params_xgb,
  181. )
  182. else:
  183. # 处理新增的变量
  184. add_columns = params_xgb.get("add_columns")
  185. num_columns = []
  186. for x_column in add_columns:
  187. if is_numeric_dtype(train_data[x_column]):
  188. num_columns.append(x_column)
  189. str_columns = [i for i in add_columns if i not in num_columns]
  190. mapper_new = []
  191. if len(str_columns) > 0:
  192. mapper_new.append((str_columns, OneHotEncoder()))
  193. for x_column in num_columns:
  194. mapper_new.append((x_column, None))
  195. mapper_new = DataFrameMapper(mapper_new)
  196. mapper_new.fit(self._data.data)
  197. features_new = mapper_new.features
  198. built_features_new = mapper_new.built_features
  199. # 合并特征处理器
  200. mapper_old: list = self._pipeline_optimized.steps[0][1]
  201. features_old = mapper_old.features
  202. features_old.extend(features_new)
  203. built_features_old = mapper_old.built_features
  204. built_features_old.extend(built_features_new)
  205. mapper_old.features = features_old
  206. mapper_old.built_features = built_features_old
  207. self._pipeline_optimized.steps[0] = ("mapper", mapper_old)
  208. # 模型初始化
  209. self.model_optimized = xgb.XGBClassifier(
  210. n_estimators=n_estimators if n_estimators else params_xgb.get("num_boost_round"),
  211. reg_alpha=params_xgb.get("alpha"),
  212. reg_lambda=params_xgb.get("lambda"),
  213. importance_type='weight',
  214. **params_xgb,
  215. )
  216. self._pipeline_optimized.steps[-1] = ("classifier", self.model_optimized)
  217. feature_names_old = model_original.get_booster().feature_names
  218. data_transform = self._pipeline_optimized.Xtransform(self._data.data)
  219. feature_names_new = [f"f{i}" for i in range(data_transform.shape[1])]
  220. model_original.get_booster().feature_names = feature_names_new
  221. with silent_print():
  222. self._pipeline_optimized.fit(train_data, train_data[y_column],
  223. classifier__verbose=False,
  224. classifier__xgb_model=model_original.get_booster(),
  225. )
  226. model_original.get_booster().feature_names = feature_names_old
  227. return ntree
  228. def train(self, ):
  229. y_column = self._ol_config.y_column
  230. params_xgb = self._ol_config.params_xgb
  231. train_data = self._data.train_data
  232. test_data = self._data.test_data
  233. df_param_columns = ["auc_train", "ks_train", "auc_test", "ks_test", "psi", "ntree"]
  234. self._df_param_optimized = pd.DataFrame(columns=df_param_columns)
  235. ntree = self._train()
  236. print(f"原模型一共有【{ntree}】棵树")
  237. # 迭代效果回溯
  238. if params_xgb.get("oltype") == "tree_refresh":
  239. print("更新原模型模式")
  240. iteration_n = ntree
  241. else:
  242. print("原模型基础上新增树模式")
  243. iteration_n = params_xgb.get("num_boost_round")
  244. for n in tqdm(range(iteration_n)):
  245. if params_xgb.get("oltype") == "tree_refresh":
  246. ntree_limit = n + 1
  247. else:
  248. ntree_limit = ntree + n + 1
  249. with silent_print():
  250. train_y_prob = self._pipeline_optimized.predict_proba(train_data, ntree_limit=ntree_limit)[:, 1]
  251. test_y_prob = self._pipeline_optimized.predict_proba(test_data, ntree_limit=ntree_limit)[:, 1]
  252. train_y = train_data[y_column]
  253. test_y = test_data[y_column]
  254. psi = round(self.psi(train_data, test_data, print_sum=False, ntree_limit=ntree_limit)['psi'].sum(), 3)
  255. # auc_test = roc_auc_score(test_y, test_y_prob)
  256. # auc_test = round(auc_test, 4)
  257. # df = pd.DataFrame({'label': test_y, 'pred': test_y_prob})
  258. # dfkslift = eva_dfkslift(df)
  259. # ks_test = round(dfkslift["ks"].max(), 4)
  260. perf = sc.perf_eva(train_y, train_y_prob, show_plot=False)
  261. auc_train = perf["AUC"]
  262. ks_train = perf["KS"]
  263. perf = sc.perf_eva(test_y, test_y_prob, show_plot=False)
  264. auc_test = perf["AUC"]
  265. ks_test = perf["KS"]
  266. row = dict(zip(df_param_columns, [auc_train, ks_train, auc_test, ks_test, psi, n + 1]))
  267. self._df_param_optimized.loc[len(self._df_param_optimized)] = row
  268. def save(self):
  269. self._ol_config.config_save()
  270. if self._pipeline_optimized is None:
  271. GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在")
  272. path_model = self._ol_config.f_get_save_path(FileEnum.PIPELINE_XGB.value)
  273. joblib.dump(self._pipeline_optimized, path_model)
  274. print(f"pipeline save to【{path_model}】success. ")
  275. # 在xgb的增量学习下直接保存pipeline会出错,所以这里需要单独保存xgb model,然后进行复原
  276. # path_model = self._ol_config.f_get_save_path(FileEnum.MODEL_XGB.value)
  277. # self.model_optimized.save_model(path_model)
  278. # print(f"model save to【{path_model}】success. ")
  279. @staticmethod
  280. def load(path: str):
  281. ol_config = OnlineLearningConfigEntity.from_config(path)
  282. ol_config._path_resources = path
  283. return OnlineLearningTrainerXgb(ol_config=ol_config)
  284. def report(self, ntree: int = None):
  285. train_data = self._data.train_data
  286. test_data = self._data.test_data
  287. self._f_get_best_model(self._df_param_optimized, ntree)
  288. if self._ol_config.jupyter_print:
  289. from IPython import display
  290. f_display_title(display, "模型优化过程")
  291. with df_print_nolimit():
  292. display.display(self._df_param_optimized)
  293. metric_value_dict = {}
  294. # 样本分布
  295. metric_value_dict["样本分布"] = MetricFucResultEntity(table=self._data.get_distribution(self._ol_config.y_column),
  296. table_font_size=10, table_cell_width=3)
  297. # 模型结果对比
  298. metric_value_dict[f"模型结果-新模型"] = self._f_get_metric_auc_ks("新模型")
  299. metric_value_dict[f"模型结果-原模型"] = self._f_get_metric_auc_ks("原模型")
  300. # 模型分psi
  301. model_psi = self.psi(train_data, test_data, print_sum=False)
  302. img_path_psi = self._ol_config.f_get_save_path(f"model_psi.png")
  303. f_df_to_image(model_psi, img_path_psi)
  304. metric_value_dict[f"模型稳定性"] = MetricFucResultEntity(table=model_psi,
  305. value=model_psi["psi"].sum().round(3),
  306. image_path=img_path_psi)
  307. # 分数分箱
  308. metric_value_dict["分数分箱-建模数据-新模型"] = self._f_get_metric_gain("新模型")
  309. metric_value_dict["分数分箱-建模数据-原模型"] = self._f_get_metric_gain("原模型")
  310. # 压力测试
  311. if self._ol_config.stress_test:
  312. metric_value_dict["压力测试"] = self._f_get_stress_test()
  313. if self._ol_config.jupyter_print:
  314. self.jupyter_print(metric_value_dict)
  315. # save_path = self._ol_config.f_get_save_path("OnlineLearning报告.docx")
  316. # ReportWord.generate_report(metric_value_dict, self._template_path, save_path=save_path)
  317. # print(f"模型报告文件储存路径:{save_path}")
  318. def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity]):
  319. from IPython import display
  320. f_display_title(display, "样本分布")
  321. display.display(metric_value_dict["样本分布"].table)
  322. f_display_title(display, "模型结果")
  323. print(f"原模型")
  324. display.display(metric_value_dict["模型结果-原模型"].table)
  325. f_display_images_by_side(display, metric_value_dict["模型结果-原模型"].image_path)
  326. print(f"新模型")
  327. display.display(metric_value_dict["模型结果-新模型"].table)
  328. f_display_images_by_side(display, metric_value_dict["模型结果-新模型"].image_path)
  329. # 模型psi
  330. f_display_title(display, "模型psi")
  331. display.display(metric_value_dict["模型稳定性"].table)
  332. print(f"模型psi: {metric_value_dict['模型稳定性'].value}")
  333. f_display_title(display, "分数分箱")
  334. print(f"建模数据上分数分箱")
  335. print(f"原模型")
  336. display.display(metric_value_dict["分数分箱-建模数据-原模型"].table)
  337. print(f"新模型")
  338. display.display(metric_value_dict["分数分箱-建模数据-新模型"].table)
  339. if "压力测试" in metric_value_dict.keys():
  340. f_display_title(display, "压力测试")
  341. display.display(metric_value_dict["压力测试"].table)
  342. if __name__ == "__main__":
  343. pass