|
@@ -0,0 +1,128 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+@author: yq
|
|
|
+@time: 2025/1/6
|
|
|
+@desc: 模型工具
|
|
|
+"""
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+import scorecardpy as sc
|
|
|
+from sklearn.metrics import roc_auc_score
|
|
|
+
|
|
|
+
|
|
|
+def f_calcu_model_ks(data, y_column, sort_ascending):
|
|
|
+ var_ks = data.groupby('MODEL_SCORE_BIN')[y_column].agg([len, np.sum]).sort_index(ascending=sort_ascending)
|
|
|
+ var_ks.columns = ['样本数', '坏样本数']
|
|
|
+ var_ks['好样本数'] = var_ks['样本数'] - var_ks['坏样本数']
|
|
|
+ var_ks['坏样本比例'] = (var_ks['坏样本数'] / var_ks['样本数']).round(4)
|
|
|
+ var_ks['样本数比例'] = (var_ks['样本数'] / var_ks['样本数'].sum()).round(4)
|
|
|
+ var_ks['总坏样本数'] = var_ks['坏样本数'].sum()
|
|
|
+ var_ks['总好样本数'] = var_ks['好样本数'].sum()
|
|
|
+ var_ks['平均坏样本率'] = (var_ks['总坏样本数'] / var_ks['样本数'].sum()).round(4)
|
|
|
+ var_ks['累计坏样本数'] = var_ks['坏样本数'].cumsum()
|
|
|
+ var_ks['累计好样本数'] = var_ks['好样本数'].cumsum()
|
|
|
+ var_ks['累计样本数'] = var_ks['样本数'].cumsum()
|
|
|
+ var_ks['累计坏样本比例'] = (var_ks['累计坏样本数'] / var_ks['总坏样本数']).round(4)
|
|
|
+ var_ks['累计好样本比例'] = (var_ks['累计好样本数'] / var_ks['总好样本数']).round(4)
|
|
|
+ var_ks['KS'] = (var_ks['累计坏样本比例'] - var_ks['累计好样本比例']).round(4)
|
|
|
+ var_ks['LIFT'] = ((var_ks['累计坏样本数'] / var_ks['累计样本数']) / var_ks['平均坏样本率']).round(4)
|
|
|
+ return var_ks.reset_index()
|
|
|
+
|
|
|
+
|
|
|
+def f_get_model_score_bin(df, card, bins=None):
|
|
|
+ train_score = sc.scorecard_ply(df, card, print_step=0)
|
|
|
+ df['score'] = train_score
|
|
|
+ if bins is None:
|
|
|
+ _, bins = pd.qcut(df['score'], q=10, retbins=True, duplicates="drop")
|
|
|
+ bins = list(bins)
|
|
|
+ bins[0] = -np.inf
|
|
|
+ bins[-1] = np.inf
|
|
|
+ score_bins = pd.cut(df['score'], bins=bins)
|
|
|
+ df['MODEL_SCORE_BIN'] = score_bins.astype(str).values
|
|
|
+ return df, bins
|
|
|
+
|
|
|
+
|
|
|
+def f_calcu_model_psi(df_train, df_test):
|
|
|
+ tmp1 = df_train.groupby('MODEL_SCORE_BIN')['MODEL_SCORE_BIN'].agg(['count']).sort_index(ascending=True)
|
|
|
+ tmp1['样本数比例'] = (tmp1['count'] / tmp1['count'].sum()).round(4)
|
|
|
+ tmp2 = df_test.groupby('MODEL_SCORE_BIN')['MODEL_SCORE_BIN'].agg(['count']).sort_index(ascending=True)
|
|
|
+ tmp2['样本数比例'] = (tmp2['count'] / tmp2['count'].sum()).round(4)
|
|
|
+ psi = ((tmp1['样本数比例'] - tmp2['样本数比例']) * np.log(tmp1['样本数比例'] / tmp2['样本数比例'])).round(4)
|
|
|
+ psi = psi.reset_index()
|
|
|
+ psi = psi.rename(columns={"样本数比例": "psi"})
|
|
|
+ psi['训练样本数'] = list(tmp1['count'])
|
|
|
+ psi['测试样本数'] = list(tmp2['count'])
|
|
|
+ psi['训练样本数比例'] = list(tmp1['样本数比例'])
|
|
|
+ psi['测试样本数比例'] = list(tmp2['样本数比例'])
|
|
|
+ return psi
|
|
|
+
|
|
|
+
|
|
|
+def f_strees_test(df: pd.DataFrame, sample_times: int, bad_rate_list: list, target_column: str, score_column: str,
|
|
|
+ sort_ascending=True):
|
|
|
+ # 压力测试
|
|
|
+ rows = []
|
|
|
+ target_rate = df[target_column].mean()
|
|
|
+ target_counts = df[target_column].value_counts().to_dict()
|
|
|
+ if len(bad_rate_list) == 0:
|
|
|
+ bad_rate_list = np.arange(0.01, target_rate * 2, target_rate * 2 / 10)
|
|
|
+ for bad_rate in bad_rate_list:
|
|
|
+ bad_rate = round(bad_rate, 3)
|
|
|
+ row = {}
|
|
|
+ ks_list = []
|
|
|
+ auc_list = []
|
|
|
+ df_tmp = None
|
|
|
+ for random_state in range(sample_times):
|
|
|
+ # 目标坏率小于样本坏率,进行好样本采样,增加好样本数量
|
|
|
+ good_sample_counts = int(target_counts.get(1) / bad_rate) - target_counts.get(1)
|
|
|
+ if bad_rate < target_rate:
|
|
|
+ # 需要的好样本的数量
|
|
|
+ good_sample_times = good_sample_counts / target_counts.get(0)
|
|
|
+ good_sample_times_int = int(good_sample_times)
|
|
|
+ good_sample_times_decimal = good_sample_times - good_sample_times_int
|
|
|
+ good_df_tmp = []
|
|
|
+ for _ in range(good_sample_times_int):
|
|
|
+ good_df_tmp.append(df[df[target_column] == 0])
|
|
|
+ good_df_tmp.append(
|
|
|
+ df[df[target_column] == 0].sample(frac=good_sample_times_decimal, random_state=random_state))
|
|
|
+ good_df_tmp = pd.concat(good_df_tmp, ignore_index=True)
|
|
|
+ else:
|
|
|
+ good_df_tmp = df[df[target_column] == 0].sample(n=good_sample_counts, random_state=random_state)
|
|
|
+ df_tmp = pd.concat([df[df[target_column] == 1], good_df_tmp], ignore_index=True)
|
|
|
+ score_cut_point = []
|
|
|
+ for q in np.arange(0.1, 1, 0.1):
|
|
|
+ cut = round(df_tmp[score_column].quantile(q), 4)
|
|
|
+ if cut not in score_cut_point:
|
|
|
+ score_cut_point.append(cut)
|
|
|
+ score_cut_point = [-np.inf] + score_cut_point + [np.inf]
|
|
|
+ df_tmp["socre_bin"] = pd.cut(df_tmp[score_column], score_cut_point).astype(str).values
|
|
|
+ ks = f_calcu_model_ks(df_tmp, target_column, sort_ascending)["KS"].max()
|
|
|
+ if sort_ascending:
|
|
|
+ auc = roc_auc_score(df_tmp[target_column], -df_tmp[score_column])
|
|
|
+ else:
|
|
|
+ auc = roc_auc_score(df_tmp[target_column], df_tmp[score_column])
|
|
|
+ ks_list.append(ks)
|
|
|
+ auc_list.append(auc)
|
|
|
+ # print(f"目标坏率: {bad_rate: .4f} | 抽样样本坏率: {df_tmp[target_column].mean(): .4f} | 抽样样本数: {len(df_tmp)} ")
|
|
|
+ row["违约率"] = bad_rate
|
|
|
+ row["抽样次数"] = sample_times
|
|
|
+ row["样本数"] = len(df_tmp)
|
|
|
+ row["好样本数"] = (df_tmp[target_column] == 0).sum()
|
|
|
+ row["坏样本数"] = (df_tmp[target_column] == 1).sum()
|
|
|
+
|
|
|
+ row["平均AUC"] = np.mean(auc_list)
|
|
|
+ row["最大AUC"] = max(auc_list)
|
|
|
+ row["最小AUC"] = min(auc_list)
|
|
|
+ row["AUC标准差"] = np.std(auc_list)
|
|
|
+ low = row["平均AUC"] - row["AUC标准差"] * 1.96
|
|
|
+ high = row["平均AUC"] + row["AUC标准差"] * 1.96
|
|
|
+ row["95%置信区间AUC"] = f"{low: .4f} - {high: .4f}"
|
|
|
+
|
|
|
+ row["平均KS"] = np.mean(ks_list)
|
|
|
+ row["最大KS"] = max(ks_list)
|
|
|
+ row["最小KS"] = min(ks_list)
|
|
|
+ row["KS标准差"] = np.std(ks_list)
|
|
|
+ low = row["平均KS"] - row["KS标准差"] * 1.96
|
|
|
+ high = row["平均KS"] + row["KS标准差"] * 1.96
|
|
|
+ row["95%置信区间KS"] = f"{low: .4f} - {high: .4f}"
|
|
|
+ rows.append(row)
|
|
|
+ return pd.DataFrame(rows)
|