# -*- coding:utf-8 -*- """ @author: yq @time: 2023/12/28 @desc: 特征工具类 """ import re from typing import List import numpy as np import pandas as pd from sklearn.preprocessing import OneHotEncoder FORMAT_DICT = { # 比例类 -1 - 1 "bin_rate1": np.arange(-1, 1 + 0.1, 0.1), # 次数类1 0 -10 "bin_cnt1": np.arange(0, 11, 1), # 次数类2 0 - 20 "bin_cnt2": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 17, 20], # 次数类3 0 - 50 "bin_cnt3": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 25, 30, 35, 40, 45, 50], # 次数类4 0 - 100 "bin_cnt4": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15, 20, 30, 40, 50, 80, 100], # 金额类1 0 - 1w "bin_amt1": np.arange(0, 1.1e4, 1e3), # 金额类2 0 - 5w "bin_amt2": np.arange(0, 5.5e4, 5e3), # 金额类3 0 - 10w "bin_amt3": np.arange(0, 11e4, 1e4), # 金额类4 0 - 20w "bin_amt4": [0, 1e4, 2e4, 3e4, 4e4, 5e4, 8e4, 10e4, 15e4, 20e4], # 金额类5 0 - 100w "bin_amt5": [0, 5e4, 10e4, 15e4, 20e4, 25e4, 30e4, 40e4, 50e4, 100e4], # 年龄类 "bin_age": [20, 25, 30, 35, 40, 45, 50, 55, 60, 65], } # 粗分箱 def f_format_bin(data_describe: pd.Series): # 筛选最合适的标准化分箱节点 percent10 = data_describe["10%"] percent90 = data_describe["90%"] cache = None for k, v_list in FORMAT_DICT.items(): bin_min = min(v_list) bin_max = max(v_list) if bin_min <= percent10 and percent90 <= bin_max: if cache is None: cache = (k, bin_max) elif cache[1] > bin_max: cache = (k, bin_max) if cache is None: return None return FORMAT_DICT[cache[0]] def f_format_value(points, raw_v): format_v = raw_v # 选择分箱内靠左的切分点 for idx in range(1, len(points)): v_left = points[idx - 1] v_right = points[idx] # 靠左原则 if v_left <= raw_v < v_right: format_v = v_left if raw_v > v_right: format_v = v_right return format_v class OneHot(): def __init__(self, ): self._one_hot_encoder = OneHotEncoder() def fit(self, data: pd.DataFrame, x_column: str): self._x_column = x_column self._one_hot_encoder.fit(data[x_column].to_numpy().reshape(-1, 1)) self._columns_onehot = [re.sub(r"[\[\]<]", "", f"{x_column}({i})") for i in self._one_hot_encoder.categories_[0]] def encoder(self, data: pd.DataFrame): one_hot_x = self._one_hot_encoder.transform(data[self._x_column].to_numpy().reshape(-1, 1)) one_hot_x = one_hot_x.toarray() for idx, column_name in enumerate(self._columns_onehot): data[column_name] = one_hot_x[:, idx] @property def columns_onehot(self) -> List[str]: return self._columns_onehot