yq 3 months ago
parent
commit
5eb918c12b
5 changed files with 143 additions and 22 deletions
  1. 13 1
      README.md
  2. 1 0
      commom/llm_call.py
  3. 2 2
      prompt/__init__.py
  4. 65 10
      prompt/prompt.py
  5. 62 9
      strategy_parse.py

+ 13 - 1
README.md

@@ -1,3 +1,15 @@
 # strategy-flow-simulation
 
-策略流模拟代码生成工具
+策略流模拟代码生成工具
+
+![img.png](flow.png)
+
+
+策略流描述  
+开始 --> 是否要客  
+是否要客 --> [满足条件: 1] 要客规则 --> 结束    
+是否要客 --> [满足条件: 0] 极优质客户  
+极优质客户 --> [满足条件: 拒绝] 通用规则  
+极优质客户 --> [满足条件: 接受] 极优质客户额度 --> 结束  
+通用规则 --> [满足条件: 0] 结束  
+通用规则 --> [满足条件: 1] 客群规则 --> 客群额度 --> 结束

+ 1 - 0
commom/llm_call.py

@@ -26,6 +26,7 @@ def f_file_upload(file_path: str):
 
 
 def call_llm(prompt: str, content_type="text"):
+    # content_type text object_string
     req_head = {
         "Authorization": BaseConfig.token,
         "Content-Type": "application/json",

+ 2 - 2
prompt/__init__.py

@@ -4,6 +4,6 @@
 @time: 2023/9/19
 @desc: 
 """
-from .prompt import f_get_prompt_parse_node, f_get_prompt_parse_flow
+from .prompt import f_get_prompt_parse_node, f_get_prompt_parse_flow,  f_get_prompt_parse_flow_image
 
-__all__ = ['f_get_prompt_parse_node','f_get_prompt_parse_flow']
+__all__ = ['f_get_prompt_parse_node','f_get_prompt_parse_flow', 'f_get_prompt_parse_flow_image']

+ 65 - 10
prompt/prompt.py

@@ -12,6 +12,7 @@ prompt_parse_node = """
 ```
 示例:
 规则集名称:通用规则
+规则集默认输出:-1
 规则集:
 规则1: 变量:年龄 age 逻辑:年龄小于18或大于等于65 输出:1 结果备注: 1代表好
 规则2: 变量:欠税总额 qsze 逻辑:欠税总额大于500 输出:3 结果备注: 3代表一般
@@ -34,10 +35,12 @@ def handle_general_rules(data:dict):
     if fyjgwjqs is not None:
         if fyjgwjqs > 10:
             return 2
+    return -1
 ```
 
 待处理规则:
 规则集名称:{rules_name}
+规则集默认输出:{default_output}
 规则集:
 {rules}
 请根据```内的示例以及待处理规则中的变量、计算逻辑及输出,用Python语言生成功能函数代码。
@@ -48,7 +51,8 @@ def handle_general_rules(data:dict):
 4、规则里有 结果备注 的请在函数中的 输出结果备注 处进行备注,没有的则无输出结果备注
 """
 
-prompt_parse_flow = """
+# 流程图
+prompt_parse_flow_image = """
 # 角色
 你是一个专业的Python代码生成器,能够根据给定的流程图和函数内容,用Python语言生成完整的流程执行代码。
 
@@ -68,20 +72,70 @@ if __name__ == "__main__":
     print(main(data))
     
 返回结果要求:
-1、function1、function2、function3指代给定的函数,请替换为具体的函数名称。
-2、针对流程分支情况请参考给定的函数内容及函数的输出结果备注。
-3、分支的最终节点,无实际意义,分支最终节点直接返回其上一个节点输入的结果。
-4、只返回函数代码,不要多余的输出。
-5、代码逻辑应严格按照图中的逻辑,不要自行添加多余的逻辑。
-6、代码的语法要符合python的语法规范,返回的代码应该是可执行的。
+1、针对流程分支情况请参考给定的函数内容及函数的输出结果备注。
+2、分支的最终节点,无实际意义,分支最终节点直接返回其上一个节点输入的结果。
+3、只返回函数代码,不要多余的输出。
+4、代码逻辑应严格按照图中的逻辑,不要自行添加多余的逻辑。
+5、代码的语法要符合python的语法规范,返回的代码应该是可执行的。
+"""
+
+# 流程描述
+prompt_parse_flow = """
+# 角色
+你是一个专业的Python代码生成器,能够根据给定的流程图和函数内容,用Python语言生成完整的流程执行代码。
+
+给定的函数内容:
+{func}
+函数与节点对应关系:
+{node_func_map}
+
+流程描述:
+```
+{flow}
+```
+流程描述说明:
+1、[满足条件: *] 为判断上一个节点的输出是否符合条件
+
+请把```括号内的流程描述的内容转换为python代码,流程中的各个节点参考给定的函数内容与函数与节点对应关系,并构造测试数据data,代码样式如下。
+
+{func_import}
+
+def main(data: dict):
+    流程逻辑
+
+if __name__ == "__main__":
+    data = 测试数据
+    print(main(data))
+
+返回结果要求:
+1、针对流程分支情况请参考给定的函数内容及函数的输出结果备注。
+2、开始节点与结束节点,无实际意义,结束节点直接返回其上一个节点输出的结果。
+3、只返回函数代码,不要多余的输出。
+4、代码逻辑应严格按照图中的逻辑,不要自行添加多余的逻辑。
+5、代码的语法要符合python的语法规范,返回的代码应该是可执行的。
 """
 
 
-def f_get_prompt_parse_node(rules_name: str, rules: str):
-    return prompt_parse_node.replace("{rules_name}", rules_name).replace("{rules}", rules)
+def f_get_prompt_parse_node(rules_name: str, rules: str, default_output:str):
+    return prompt_parse_node\
+        .replace("{rules_name}", rules_name)\
+        .replace("{rules}", rules)\
+        .replace("{default_output}", default_output)
+
 
+def f_get_prompt_parse_flow_image(node_list: list):
+    func = ""
+    node_func_map = ""
+    func_import = ""
+    for node_name, func_name, code in node_list:
+        func = f"{func}{code}\n\n"
+        node_func_map = f"{node_func_map}{node_name}: {func_name}\n"
+        func_import = f"{func_import}from {func_name} import {func_name}\n"
+    return prompt_parse_flow_image.replace("{func}", func) \
+        .replace("{node_func_map}", node_func_map) \
+        .replace("{func_import}", func_import)
 
-def f_get_prompt_parse_flow(node_list: list):
+def f_get_prompt_parse_flow(node_list: list, flow:str):
     func = ""
     node_func_map = ""
     func_import = ""
@@ -91,4 +145,5 @@ def f_get_prompt_parse_flow(node_list: list):
         func_import = f"{func_import}from {func_name} import {func_name}\n"
     return prompt_parse_flow.replace("{func}", func) \
         .replace("{node_func_map}", node_func_map) \
+        .replace("{flow}", flow) \
         .replace("{func_import}", func_import)

+ 62 - 9
strategy_parse.py

@@ -15,17 +15,17 @@ from openpyxl import load_workbook
 
 from commom import f_get_save_path, call_llm, f_file_upload, GeneralException
 from enums import ResultCodesEnum
-from prompt import f_get_prompt_parse_node, f_get_prompt_parse_flow
+from prompt import f_get_prompt_parse_node, f_get_prompt_parse_flow, f_get_prompt_parse_flow_image
 
 
-def _f_parse_flow(ws, node_list: list):
+def _f_parse_flow_image(ws, node_list: list):
     image = ws._images[0]
     img = Image.open(image.ref).convert("RGB")
     save_path = f_get_save_path("流程图.png")
     img.save(save_path)
     time.sleep(1)
     file_id = f_file_upload(save_path)
-    prompt = f_get_prompt_parse_flow(node_list)
+    prompt = f_get_prompt_parse_flow_image(node_list)
     print(prompt)
     prompt = [
         {
@@ -50,6 +50,27 @@ def _f_parse_flow(ws, node_list: list):
         f.write("")
 
 
+def _f_parse_flow(node_list: list, df: pd.DataFrame):
+    flow = ""
+    for _, row in df.iterrows():
+        strategy = row["策略流描述"]
+        flow = f"{flow}{strategy}\n"
+    flow = flow.strip()
+
+    prompt = f_get_prompt_parse_flow(node_list, flow)
+    print(prompt)
+    llm_answer = call_llm(prompt)
+    print(llm_answer)
+    code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
+    save_path = f_get_save_path("flow.py")
+    with open(save_path, mode="w", encoding="utf8") as f:
+        f.write(code)
+
+    save_path = f_get_save_path("__init__.py")
+    with open(save_path, mode="w", encoding="utf8") as f:
+        f.write("")
+
+
 def _f_parse_node(df: pd.DataFrame, node_name):
     rules = ""
     for idx, row in df.iterrows():
@@ -61,14 +82,30 @@ def _f_parse_node(df: pd.DataFrame, node_name):
 
         rule_out = row["输出"]
 
-        notes_output = row["结果备注"]
+        notes_output = row["输出备注"]
         if notes_output is None or notes_output != notes_output:
             notes_output = ""
         else:
+            notes_output = notes_output.replace("\n", " ")
             notes_output = f" 结果备注: {notes_output}"
-        rules = f"{rules}规则{idx + 1}: 变量:{var_name} 逻辑:{rule_content} 输出:{rule_out}{notes_output}\n"
 
-    prompt = f_get_prompt_parse_node(node_name, rules)
+        notes_input = row["输入备注"]
+        if notes_input is None or notes_input != notes_input:
+            notes_input = ""
+        else:
+            notes_input = notes_input.replace("\n", " ")
+            notes_input = f" 变量备注: {notes_input}"
+
+        rules = f"{rules}规则{idx + 1}: 变量:{var_name} 逻辑:{rule_content} 输出:{rule_out}{notes_input}{notes_output}\n"
+
+    default_output = list(df["默认输出"])[0]
+    if default_output is None or default_output != default_output:
+        default_output = ""
+    else:
+        default_output = str(default_output).replace("\n", " ")
+        default_output = f"{default_output}"
+
+    prompt = f_get_prompt_parse_node(node_name, rules, default_output)
     print(prompt)
     llm_answer = call_llm(prompt)
     code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
@@ -80,7 +117,7 @@ def _f_parse_node(df: pd.DataFrame, node_name):
     return func_name, code
 
 
-def f_parse_strategy(file_path):
+def f_parse_strategy_image(file_path):
     wb = load_workbook(file_path)
     excel = pd.ExcelFile(file_path)
     sheet_names = excel.sheet_names
@@ -93,10 +130,26 @@ def f_parse_strategy(file_path):
         df = excel.parse(sheet_name=node_name)
         func_name, code = _f_parse_node(df, node_name)
         node_list.append((node_name, func_name, code))
-    _f_parse_flow(wb["流程图"], node_list)
+    _f_parse_flow_image(wb["流程图"], node_list)
     wb.close()
     excel.close()
 
 
+def f_parse_strategy(file_path):
+    excel = pd.ExcelFile(file_path)
+    sheet_names = excel.sheet_names
+    if "流程" not in sheet_names:
+        GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【流程】不存在")
+    node_list = []
+    for node_name in tqdm(sheet_names):
+        if node_name == "流程":
+            continue
+        df = excel.parse(sheet_name=node_name)
+        func_name, code = _f_parse_node(df, node_name)
+        node_list.append((node_name, func_name, code))
+    _f_parse_flow(node_list, excel.parse(sheet_name="流程"))
+    excel.close()
+
+
 if __name__ == "__main__":
-    f_parse_strategy("./cache/策略节点配置demo.xlsx")
+    f_parse_strategy("./cache/策略节点配置3demo.xlsx")