# -*- coding: utf-8 -*- """ @author: yq @time: 2024/12/18 @desc: 策略流节点解析 """ import json import os import re import time from typing import List import pandas as pd from PIL import Image from openpyxl import load_workbook from tqdm import tqdm from commom import call_llm, f_file_upload, GeneralException, f_get_datetime, f_create_zip from config import BaseConfig from enums import ResultCodesEnum from prompt import f_get_prompt_parse_node, f_get_prompt_parse_flow, f_get_prompt_parse_flow_image class StrategyParse: def __init__(self, project_name: str = None, *args, **kwargs): # 项目名称,和缓存路径有关 self._project_name = project_name if self._project_name is None or len(self._project_name) == 0: self._base_dir = os.path.join(BaseConfig.base_dir, f"{f_get_datetime()}") else: self._base_dir = os.path.join(BaseConfig.base_dir, self._project_name) os.makedirs(self._base_dir, exist_ok=True) @property def project_name(self): return self._project_name @property def base_dir(self): return self._base_dir def _f_get_save_path(self, file_name: str) -> str: path = os.path.join(self._base_dir, file_name) return path def _f_get_py_files(self, ): py_files = [] file_name_list: List[str] = os.listdir(self._base_dir) for file_name in file_name_list: if ".py" in file_name: py_files.append(os.path.join(self._base_dir, file_name)) return py_files def _f_parse_flow_image(self, ws, node_list: list): image = ws._images[0] img = Image.open(image.ref).convert("RGB") save_path = self._f_get_save_path("流程图.png") img.save(save_path) time.sleep(1) file_id = f_file_upload(save_path) prompt = f_get_prompt_parse_flow_image(node_list) print(prompt) prompt = [ { "type": "text", "text": prompt }, { "type": "image", "file_id": file_id } ] prompt = json.dumps(prompt, ensure_ascii=False) llm_answer = call_llm(prompt, "object_string") print(llm_answer) code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0] save_path = self._f_get_save_path("flow.py") with open(save_path, mode="w", encoding="utf8") as f: f.write(code) save_path = self._f_get_save_path("__init__.py") with open(save_path, mode="w", encoding="utf8") as f: f.write("") def _f_parse_flow(self, node_list: list, df: pd.DataFrame): node_func_dict = {BaseConfig.flow_sheet_name: "flow.py"} func = "" node_func_map = "" func_import = "" for node_name, func_name, code in node_list: node_func_dict[node_name] = f"{func_name}.py" func = f"{func}{code}\n\n" node_func_map = f"{node_func_map}{node_name}: {func_name}\n" func_import = f"{func_import}from {func_name} import {func_name}\n" save_path = self._f_get_save_path(BaseConfig.node_map_name) with open(save_path, mode="w", encoding="utf8") as f: f.write(json.dumps(node_func_dict, ensure_ascii=False)) flow = "" for _, row in df.iterrows(): strategy = row["策略流描述"] flow = f"{flow}{strategy}\n" flow = flow.strip() prompt = f_get_prompt_parse_flow(func, node_func_map, func_import, flow) print(prompt) llm_answer = call_llm(prompt) print(llm_answer) code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0] save_path = self._f_get_save_path("flow.py") with open(save_path, mode="w", encoding="utf8") as f: f.write(code) save_path = self._f_get_save_path("__init__.py") with open(save_path, mode="w", encoding="utf8") as f: f.write("") def _f_parse_node(self, df: pd.DataFrame, node_name): rules = "" for idx, row in df.iterrows(): var_name = row["变量"] var_name = var_name.replace("\n", " ") rule_content = row["逻辑"] rule_content = rule_content.replace("\n", " ") rule_out = row["输出"] notes_output = row["输出备注"] if notes_output is None or notes_output != notes_output: notes_output = "" else: notes_output = notes_output.replace("\n", " ") notes_output = f" 结果备注: {notes_output}" notes_input = row["输入备注"] if notes_input is None or notes_input != notes_input: notes_input = "" else: notes_input = notes_input.replace("\n", " ") notes_input = f" 变量备注: {notes_input}" rules = f"{rules}规则{idx + 1}: 变量:{var_name} 逻辑:{rule_content} 输出:{rule_out}{notes_input}{notes_output}\n" default_output = list(df["默认输出"])[0] if default_output is None or default_output != default_output: default_output = "" else: default_output = str(default_output).replace("\n", " ") default_output = f"{default_output}" prompt = f_get_prompt_parse_node(node_name, rules, default_output) print(prompt) llm_answer = call_llm(prompt) code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0] func_name = re.findall(r"def (.*)\(data", code)[0] save_path = self._f_get_save_path(f"{func_name}.py") print(code) with open(save_path, mode="w", encoding="utf8") as f: f.write(code) return func_name, code def _f_parse_strategy_image(self, file_path): wb = load_workbook(file_path) excel = pd.ExcelFile(file_path) sheet_names = excel.sheet_names if BaseConfig.flow_sheet_name not in sheet_names: raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【{BaseConfig.flow_sheet_name}】不存在") node_list = [] for node_name in tqdm(sheet_names): if node_name == BaseConfig.flow_sheet_name: continue df = excel.parse(sheet_name=node_name) func_name, code = self._f_parse_node(df, node_name) node_list.append((node_name, func_name, code)) self._f_parse_flow_image(wb[BaseConfig.flow_sheet_name], node_list) wb.close() excel.close() def f_parse_strategy(self, excel: pd.ExcelFile, progress=None): sheet_names = excel.sheet_names if BaseConfig.flow_sheet_name not in sheet_names: raise GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【{BaseConfig.flow_sheet_name}】不存在") node_list = [] for node_name in tqdm(sheet_names): if node_name == BaseConfig.flow_sheet_name: continue df = excel.parse(sheet_name=node_name) func_name, code = self._f_parse_node(df, node_name) node_list.append((node_name, func_name, code)) if progress is not None: progress(0.9) self._f_parse_flow(node_list, excel.parse(sheet_name=BaseConfig.flow_sheet_name)) save_path = self._f_get_save_path(BaseConfig.code_zip_name) py_files = self._f_get_py_files() f_create_zip(save_path, py_files) if __name__ == "__main__": excel = pd.ExcelFile("./cache/策略节点配置3demo.xlsx") strategy_parse = StrategyParse() strategy_parse.f_parse_strategy(excel) excel.close()