123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- # -*- coding: utf-8 -*-
- """
- @author: yq
- @time: 2024/12/18
- @desc: 策略流节点解析
- """
- import json
- import os
- import re
- import time
- from typing import List
- import pandas as pd
- from PIL import Image
- from openpyxl import load_workbook
- from tqdm import tqdm
- from commom import call_llm, f_file_upload, GeneralException, f_get_datetime, f_create_zip
- from config import BaseConfig
- from enums import ResultCodesEnum
- from prompt import f_get_prompt_parse_node, f_get_prompt_parse_flow, f_get_prompt_parse_flow_image
- class StrategyParse:
- def __init__(self, project_name: str = None, *args, **kwargs):
- # 项目名称,和缓存路径有关
- self._project_name = project_name
- if self._project_name is None or len(self._project_name) == 0:
- self._base_dir = os.path.join(BaseConfig.base_dir, f"{f_get_datetime()}")
- else:
- self._base_dir = os.path.join(BaseConfig.base_dir, self._project_name)
- os.makedirs(self._base_dir, exist_ok=True)
- @property
- def project_name(self):
- return self._project_name
- @property
- def base_dir(self):
- return self._base_dir
- def _f_get_save_path(self, file_name: str) -> str:
- path = os.path.join(self._base_dir, file_name)
- return path
- def _f_get_py_files(self, ):
- py_files = []
- file_name_list: List[str] = os.listdir(self._base_dir)
- for file_name in file_name_list:
- if ".py" in file_name:
- py_files.append(os.path.join(self._base_dir, file_name))
- return py_files
- # 未使用
- def _f_parse_flow_image(self, ws, node_list: list):
- image = ws._images[0]
- img = Image.open(image.ref).convert("RGB")
- save_path = self._f_get_save_path("流程图.png")
- img.save(save_path)
- time.sleep(1)
- file_id = f_file_upload(save_path)
- prompt = f_get_prompt_parse_flow_image(node_list)
- print(prompt)
- prompt = [
- {
- "type": "text",
- "text": prompt
- },
- {
- "type": "image",
- "file_id": file_id
- }
- ]
- prompt = json.dumps(prompt, ensure_ascii=False)
- llm_answer = call_llm(prompt, "object_string")
- print(llm_answer)
- code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
- save_path = self._f_get_save_path("flow.py")
- with open(save_path, mode="w", encoding="utf8") as f:
- f.write(code)
- save_path = self._f_get_save_path("__init__.py")
- with open(save_path, mode="w", encoding="utf8") as f:
- f.write("")
- # 未使用
- def _f_parse_strategy_image(self, file_path):
- wb = load_workbook(file_path)
- excel = pd.ExcelFile(file_path)
- sheet_names = excel.sheet_names
- if BaseConfig.flow_sheet_name not in sheet_names:
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【{BaseConfig.flow_sheet_name}】不存在")
- node_list = []
- for node_name in tqdm(sheet_names):
- if node_name == BaseConfig.flow_sheet_name:
- continue
- df = excel.parse(sheet_name=node_name)
- func_name, code = self._f_parse_node(df, node_name)
- node_list.append((node_name, func_name, code))
- self._f_parse_flow_image(wb[BaseConfig.flow_sheet_name], node_list)
- wb.close()
- excel.close()
- def _f_parse_node(self, df: pd.DataFrame, node_name):
- rules = ""
- for idx, row in df.iterrows():
- var_name = row["变量"]
- var_name = var_name.replace("\n", " ")
- rule_content = row["逻辑"]
- rule_content = rule_content.replace("\n", " ")
- rule_out = row["输出"]
- notes_output = row["输出备注"]
- if notes_output is None or notes_output != notes_output:
- notes_output = ""
- else:
- notes_output = notes_output.replace("\n", " ")
- notes_output = f" 结果备注: {notes_output}"
- notes_input = row["输入备注"]
- if notes_input is None or notes_input != notes_input:
- notes_input = ""
- else:
- notes_input = notes_input.replace("\n", " ")
- notes_input = f" 变量备注: {notes_input}"
- rules = f"{rules}规则{idx + 1}: 变量:{var_name} 逻辑:{rule_content} 输出:{rule_out}{notes_input}{notes_output}\n"
- default_output = list(df["默认输出"])[0]
- if default_output is None or default_output != default_output:
- default_output = ""
- else:
- default_output = str(default_output).replace("\n", " ")
- default_output = f"{default_output}"
- # 构造提示词
- prompt = f_get_prompt_parse_node(node_name, rules, default_output)
- print(prompt)
- # 调用大模型
- llm_answer = call_llm(prompt)
- # 解析代码部分
- code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
- # 解析函数名
- func_name = re.findall(r"def (.*)\(data", code)[0]
- # 保存节点代码
- save_path = self._f_get_save_path(f"{func_name}.py")
- print(code)
- with open(save_path, mode="w", encoding="utf8") as f:
- f.write(code)
- return func_name, code
- def _f_parse_flow(self, node_list: list, df: pd.DataFrame):
- node_func_dict = {BaseConfig.flow_sheet_name: "flow.py"}
- func = ""
- node_func_map = ""
- func_import = ""
- for node_name, func_name, code in node_list:
- node_func_dict[node_name] = f"{func_name}.py"
- func = f"{func}{code}\n\n"
- node_func_map = f"{node_func_map}{node_name}: {func_name}\n"
- func_import = f"{func_import}from {func_name} import {func_name}\n"
- save_path = self._f_get_save_path(BaseConfig.node_map_name)
- with open(save_path, mode="w", encoding="utf8") as f:
- f.write(json.dumps(node_func_dict, ensure_ascii=False))
- flow = ""
- for _, row in df.iterrows():
- strategy = row["策略流描述"]
- flow = f"{flow}{strategy}\n"
- flow = flow.strip()
- # 构造提示词
- prompt = f_get_prompt_parse_flow(func, node_func_map, func_import, flow)
- print(prompt)
- # 调用大模型
- llm_answer = call_llm(prompt)
- print(llm_answer)
- # 解析代码部分
- code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
- # 保存代码
- save_path = self._f_get_save_path("flow.py")
- with open(save_path, mode="w", encoding="utf8") as f:
- f.write(code)
- save_path = self._f_get_save_path("__init__.py")
- with open(save_path, mode="w", encoding="utf8") as f:
- f.write("")
- def f_parse_strategy(self, excel: pd.ExcelFile, progress=None):
- sheet_names = excel.sheet_names
- if BaseConfig.flow_sheet_name not in sheet_names:
- raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【{BaseConfig.flow_sheet_name}】不存在")
- # 解析各节点
- node_list = []
- for node_name in tqdm(sheet_names):
- # 忽略“流程”的sheet
- if node_name == BaseConfig.flow_sheet_name:
- continue
- df = excel.parse(sheet_name=node_name)
- func_name, code = self._f_parse_node(df, node_name)
- node_list.append((node_name, func_name, code))
- if progress is not None:
- progress(0.9)
- # 解析流程
- self._f_parse_flow(node_list, excel.parse(sheet_name=BaseConfig.flow_sheet_name))
- # 打包文件
- save_path = self._f_get_save_path(BaseConfig.code_zip_name)
- py_files = self._f_get_py_files()
- f_create_zip(save_path, py_files)
- if __name__ == "__main__":
- excel = pd.ExcelFile("./template/demo.xlsx")
- strategy_parse = StrategyParse()
- strategy_parse.f_parse_strategy(excel)
- excel.close()
|