trainer_xgb.py 15 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. @author: yq
  4. @time: 2025/2/27
  5. @desc:
  6. """
  7. import os
  8. from os.path import dirname, realpath
  9. from typing import Dict, List
  10. import joblib
  11. import pandas as pd
  12. import scorecardpy as sc
  13. import xgboost as xgb
  14. from pypmml import Model
  15. from sklearn2pmml import PMMLPipeline, sklearn2pmml
  16. from tqdm import tqdm
  17. from commom import GeneralException, f_image_crop_white_borders, f_df_to_image, f_display_title, \
  18. f_display_images_by_side, silent_print, df_print_nolimit
  19. from entitys import DataSplitEntity, OnlineLearningConfigEntity, MetricFucResultEntity
  20. from enums import ResultCodesEnum, ConstantEnum, FileEnum
  21. from init import init
  22. from model import f_get_model_score_bin, f_calcu_model_ks, f_stress_test, f_calcu_model_psi, Xtransformer_fit, \
  23. Xtransform, fit
  24. init()
  25. class OnlineLearningTrainerXgb:
  26. def __init__(self, data: DataSplitEntity = None, ol_config: OnlineLearningConfigEntity = None, *args, **kwargs):
  27. # 覆写方法
  28. PMMLPipeline.Xtransformer_fit = Xtransformer_fit
  29. PMMLPipeline.Xtransform = Xtransform
  30. PMMLPipeline.fit = fit
  31. if ol_config is not None:
  32. self._ol_config = ol_config
  33. else:
  34. self._ol_config = OnlineLearningConfigEntity(*args, **kwargs)
  35. self._data = data
  36. self._df_param_optimized = None
  37. self._model_optimized_list = []
  38. self._pipeline_original: PMMLPipeline
  39. self._pipeline_optimized: PMMLPipeline
  40. self.model_optimized: xgb.XGBClassifier
  41. # 报告模板
  42. self._template_path = os.path.join(dirname(dirname(realpath(__file__))),
  43. "./template/OnlineLearning报告模板_xgb.docx")
  44. self._init(self._ol_config.path_resources)
  45. def _init(self, path: str):
  46. if not os.path.isdir(path):
  47. raise GeneralException(ResultCodesEnum.ILLEGAL_PARAMS, message=f"【{path}】不是文件夹")
  48. path_model = os.path.join(path, FileEnum.MODEL.value)
  49. if not os.path.isfile(path_model):
  50. raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型文件【{path_model}】不存在")
  51. self._pipeline_original = joblib.load(path_model)
  52. self._pipeline_optimized = joblib.load(path_model)
  53. print(f"model load from【{path_model}】success.")
  54. path_model = os.path.join(path, FileEnum.MODEL_XGB.value)
  55. if os.path.isfile(path_model):
  56. model = xgb.XGBClassifier()
  57. model.load_model(path_model)
  58. self._pipeline_optimized.steps[-1] = ("classifier", model)
  59. print(f"model load from【{path_model}】success.")
  60. def _f_rewrite_pmml(self, path_pmml: str):
  61. with open(path_pmml, mode="r", encoding="utf-8") as f:
  62. pmml = f.read()
  63. pmml = pmml.replace('optype="categorical" dataType="double"', 'optype="categorical" dataType="string"')
  64. with open(path_pmml, mode="w", encoding="utf-8") as f:
  65. f.write(pmml)
  66. f.flush()
  67. def _f_get_best_model(self, df_param: pd.DataFrame, ntree: int = None):
  68. if ntree is None:
  69. df_param_sort = df_param.sort_values(by=["ks_test", "auc_test"], ascending=[False, False])
  70. print(f"选择最佳参数:\n{df_param_sort.iloc[0].to_dict()}")
  71. self._train(int(df_param_sort.iloc[0][2]))
  72. else:
  73. print(f"选择ntree:【{ntree}】的参数:\n{df_param[df_param['ntree'] == ntree].iloc[0].to_dict()}")
  74. self._train(ntree)
  75. if self._ol_config.save_pmml:
  76. data = self._data.data
  77. path_pmml = self._ol_config.f_get_save_path(FileEnum.PMML.value)
  78. # pipeline = make_pmml_pipeline(self.model)
  79. sklearn2pmml(self._pipeline_optimized, path_pmml, with_repr=True, )
  80. self._f_rewrite_pmml(path_pmml)
  81. print(f"model save to【{path_pmml}】success. ")
  82. # pmml与原生模型结果一致性校验
  83. model_pmml = Model.fromFile(path_pmml)
  84. prob_pmml = model_pmml.predict(data)["probability(1)"]
  85. prob_pipeline = self._pipeline_optimized.predict_proba(data)[:, 1]
  86. diff = pd.DataFrame()
  87. diff["prob_pmml"] = prob_pmml
  88. diff["prob_pipeline"] = prob_pipeline
  89. diff["diff"] = diff["prob_pmml"] - diff["prob_pipeline"]
  90. diff["diff_format"] = diff["diff"].apply(lambda x: 1 if abs(x) < 0.001 else 0)
  91. print(f"pmml模型结果一致率(误差小于0.001):{len(diff) / diff['diff_format'].sum().round(3) * 100}%")
  92. def _f_get_metric_auc_ks(self, model_type: str):
  93. def _get_auc_ks(data, title):
  94. y = data[self._ol_config.y_column]
  95. y_prob = self.prob(data, model)
  96. perf = sc.perf_eva(y, y_prob, title=f"{title}", show_plot=True)
  97. path = self._ol_config.f_get_save_path(f"perf_{title}.png")
  98. perf["pic"].savefig(path)
  99. auc = perf["AUC"]
  100. ks = perf["KS"]
  101. f_image_crop_white_borders(path, path)
  102. return auc, ks, path
  103. train_data = self._data.train_data
  104. test_data = self._data.test_data
  105. data = self._data.data
  106. model = self._pipeline_optimized
  107. if model_type != "新模型":
  108. model = self._pipeline_original
  109. img_path_auc_ks = []
  110. auc, ks, path = _get_auc_ks(data, f"{model_type}-建模数据")
  111. img_path_auc_ks.append(path)
  112. train_auc, train_ks, path = _get_auc_ks(train_data, f"{model_type}-训练集")
  113. img_path_auc_ks.append(path)
  114. test_auc, test_ks, path = _get_auc_ks(test_data, f"{model_type}-测试集")
  115. img_path_auc_ks.append(path)
  116. df_auc_ks = pd.DataFrame()
  117. df_auc_ks["样本集"] = ["建模数据", "训练集", "测试集"]
  118. df_auc_ks["AUC"] = [auc, train_auc, test_auc]
  119. df_auc_ks["KS"] = [ks, train_ks, test_ks]
  120. return MetricFucResultEntity(table=df_auc_ks, image_path=img_path_auc_ks, image_size=5, table_font_size=10)
  121. def _f_get_metric_gain(self, model_type: str):
  122. y_column = self._ol_config.y_column
  123. data = self._data.data
  124. model = self._pipeline_optimized
  125. if model_type != "新模型":
  126. model = self._pipeline_original
  127. score = self.prob(data, model)
  128. score_bin, _ = f_get_model_score_bin(data, score)
  129. gain = f_calcu_model_ks(score_bin, y_column, sort_ascending=False)
  130. img_path_gain = self._ol_config.f_get_save_path(f"{model_type}-gain.png")
  131. f_df_to_image(gain, img_path_gain)
  132. return MetricFucResultEntity(table=gain, image_path=img_path_gain)
  133. def _f_get_stress_test(self, ):
  134. stress_sample_times = self._ol_config.stress_sample_times
  135. stress_bad_rate_list = self._ol_config.stress_bad_rate_list
  136. y_column = self._ol_config.y_column
  137. data = self._data.data
  138. score = self.prob(data, self._pipeline_optimized)
  139. score_bin, _ = f_get_model_score_bin(data, score)
  140. df_stress = f_stress_test(score_bin, sample_times=stress_sample_times, bad_rate_list=stress_bad_rate_list,
  141. target_column=y_column, score_column=ConstantEnum.SCORE.value, sort_ascending=False)
  142. img_path_stress = self._ol_config.f_get_save_path(f"stress.png")
  143. f_df_to_image(df_stress, img_path_stress)
  144. return MetricFucResultEntity(table=df_stress, image_path=img_path_stress)
  145. def prob(self, x: pd.DataFrame, pipeline=None, ntree_limit=None):
  146. if pipeline is None:
  147. pipeline = self._pipeline_optimized
  148. y_prob = pipeline.predict_proba(x, ntree_limit=ntree_limit)[:, 1]
  149. return y_prob
  150. def psi(self, x1: pd.DataFrame, x2: pd.DataFrame, points: List[float] = None, print_sum=True,
  151. ntree_limit=None) -> pd.DataFrame:
  152. y1 = self.prob(x1, ntree_limit=ntree_limit)
  153. y2 = self.prob(x2, ntree_limit=ntree_limit)
  154. x1_score_bin, score_bins = f_get_model_score_bin(x1, y1, points)
  155. x2_score_bin, _ = f_get_model_score_bin(x2, y2, score_bins)
  156. model_psi = f_calcu_model_psi(x1_score_bin, x2_score_bin, sort_ascending=False)
  157. if print_sum:
  158. print(f"模型psi: {model_psi['psi'].sum()}")
  159. return model_psi
  160. def _train(self, n_estimators: int = None):
  161. y_column = self._ol_config.y_column
  162. train_data = self._data.train_data
  163. model_original: xgb.XGBClassifier = self._pipeline_original.steps[-1][1]
  164. ntree = model_original.n_estimators if model_original.best_ntree_limit is None else model_original.best_ntree_limit
  165. self.model_optimized = xgb.XGBClassifier(
  166. n_estimators=n_estimators if n_estimators else ntree,
  167. updater="refresh",
  168. process_type="update",
  169. refresh_leaf=True,
  170. learning_rate=self._ol_config.lr,
  171. random_state=self._ol_config.random_state,
  172. )
  173. self._pipeline_optimized.steps[-1] = ("classifier", self.model_optimized)
  174. with silent_print():
  175. self._pipeline_optimized.fit(train_data, train_data[y_column],
  176. classifier__verbose=False,
  177. classifier__xgb_model=model_original.get_booster(),
  178. )
  179. return ntree
  180. def train(self, ):
  181. y_column = self._ol_config.y_column
  182. train_data = self._data.train_data
  183. test_data = self._data.test_data
  184. df_param_columns = ["auc_test", "ks_test", "psi", "ntree"]
  185. self._df_param_optimized = pd.DataFrame(columns=df_param_columns)
  186. ntree = self._train()
  187. print(f"原模型一共有【{ntree}】棵树")
  188. for n in tqdm(range(ntree)):
  189. n = n + 1
  190. test_y_prob = self._pipeline_optimized.predict_proba(test_data, ntree_limit=n)[:, 1]
  191. test_y = test_data[y_column]
  192. psi = round(self.psi(train_data, test_data, print_sum=False, ntree_limit=n)['psi'].sum(), 3)
  193. # auc_test = roc_auc_score(test_y, test_y_prob)
  194. # auc_test = round(auc_test, 4)
  195. # df = pd.DataFrame({'label': test_y, 'pred': test_y_prob})
  196. # dfkslift = eva_dfkslift(df)
  197. # ks_test = round(dfkslift["ks"].max(), 4)
  198. perf = sc.perf_eva(test_y, test_y_prob, show_plot=False)
  199. auc_test = perf["AUC"]
  200. ks_test = perf["KS"]
  201. row = dict(zip(df_param_columns, [auc_test, ks_test, psi, n]))
  202. self._df_param_optimized.loc[len(self._df_param_optimized)] = row
  203. def save(self):
  204. self._ol_config.config_save()
  205. if self._pipeline_optimized is None:
  206. GeneralException(ResultCodesEnum.NOT_FOUND, message=f"模型不存在")
  207. path_model = self._ol_config.f_get_save_path(FileEnum.MODEL.value)
  208. joblib.dump(self._pipeline_optimized, path_model)
  209. print(f"model save to【{path_model}】success. ")
  210. # 在xgb的增量学习下直接保存pipeline会出错,所以这里需要单独保存xgb model,然后进行复原
  211. path_model = self._ol_config.f_get_save_path(FileEnum.MODEL_XGB.value)
  212. self.model_optimized.save_model(path_model)
  213. print(f"model save to【{path_model}】success. ")
  214. @staticmethod
  215. def load(path: str):
  216. ol_config = OnlineLearningConfigEntity.from_config(path)
  217. ol_config._path_resources = path
  218. return OnlineLearningTrainerXgb(ol_config=ol_config)
  219. def report(self, ntree: int = None):
  220. train_data = self._data.train_data
  221. test_data = self._data.test_data
  222. self._f_get_best_model(self._df_param_optimized, ntree)
  223. if self._ol_config.jupyter_print:
  224. from IPython import display
  225. f_display_title(display, "模型优化过程")
  226. with df_print_nolimit():
  227. display.display(self._df_param_optimized)
  228. metric_value_dict = {}
  229. # 样本分布
  230. metric_value_dict["样本分布"] = MetricFucResultEntity(table=self._data.get_distribution(self._ol_config.y_column),
  231. table_font_size=10, table_cell_width=3)
  232. # 模型结果对比
  233. metric_value_dict[f"模型结果-新模型"] = self._f_get_metric_auc_ks("新模型")
  234. metric_value_dict[f"模型结果-原模型"] = self._f_get_metric_auc_ks("原模型")
  235. # 模型分psi
  236. model_psi = self.psi(train_data, test_data, print_sum=False)
  237. img_path_psi = self._ol_config.f_get_save_path(f"model_psi.png")
  238. f_df_to_image(model_psi, img_path_psi)
  239. metric_value_dict[f"模型稳定性"] = MetricFucResultEntity(table=model_psi,
  240. value=model_psi["psi"].sum().round(3),
  241. image_path=img_path_psi)
  242. # 分数分箱
  243. metric_value_dict["分数分箱-建模数据-新模型"] = self._f_get_metric_gain("新模型")
  244. metric_value_dict["分数分箱-建模数据-原模型"] = self._f_get_metric_gain("原模型")
  245. # 压力测试
  246. if self._ol_config.stress_test:
  247. metric_value_dict["压力测试"] = self._f_get_stress_test()
  248. if self._ol_config.jupyter_print:
  249. self.jupyter_print(metric_value_dict)
  250. # save_path = self._ol_config.f_get_save_path("OnlineLearning报告.docx")
  251. # ReportWord.generate_report(metric_value_dict, self._template_path, save_path=save_path)
  252. # print(f"模型报告文件储存路径:{save_path}")
  253. def jupyter_print(self, metric_value_dict=Dict[str, MetricFucResultEntity]):
  254. from IPython import display
  255. f_display_title(display, "样本分布")
  256. display.display(metric_value_dict["样本分布"].table)
  257. f_display_title(display, "模型结果")
  258. print(f"原模型")
  259. display.display(metric_value_dict["模型结果-原模型"].table)
  260. f_display_images_by_side(display, metric_value_dict["模型结果-原模型"].image_path)
  261. print(f"新模型")
  262. display.display(metric_value_dict["模型结果-新模型"].table)
  263. f_display_images_by_side(display, metric_value_dict["模型结果-新模型"].image_path)
  264. # 模型psi
  265. f_display_title(display, "模型psi")
  266. display.display(metric_value_dict["模型稳定性"].table)
  267. print(f"模型psi: {metric_value_dict['模型稳定性'].value}")
  268. f_display_title(display, "分数分箱")
  269. print(f"建模数据上分数分箱")
  270. print(f"原模型")
  271. display.display(metric_value_dict["分数分箱-建模数据-原模型"].table)
  272. print(f"新模型")
  273. display.display(metric_value_dict["分数分箱-建模数据-新模型"].table)
  274. if "压力测试" in metric_value_dict.keys():
  275. f_display_title(display, "压力测试")
  276. display.display(metric_value_dict["压力测试"].table)
  277. if __name__ == "__main__":
  278. pass