yq 3 miesięcy temu
rodzic
commit
e7a8666d50
11 zmienionych plików z 483 dodań i 160 usunięć
  1. 76 0
      app.py
  2. 3 2
      commom/__init__.py
  3. 6 0
      commom/utils.py
  4. 3 0
      config/base_config.py
  5. BIN
      flow.png
  6. 6 12
      prompt/prompt.py
  7. 3 14
      requirements.txt
  8. 185 132
      strategy_parse.py
  9. 11 0
      webui/__init__.py
  10. 32 0
      webui/manager.py
  11. 158 0
      webui/utils.py

+ 76 - 0
app.py

@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/12/4
+@desc: 
+"""
+import matplotlib
+
+matplotlib.use('Agg')
+
+import gradio as gr
+
+from webui import f_project_is_exist, f_data_upload, engine, f_download_code, f_code_generate, f_get_sheet_data
+
+input_elems = set()
+elem_dict = {}
+
+with gr.Blocks() as demo:
+    gr.HTML('<h1 ><center><font size="5">Strategy-Flow-Simulation</font></center></h1>')
+    gr.HTML('<h2 ><center><font size="2">决策流快速仿真工具</font></center></h2>')
+    with gr.Tabs():
+        with gr.TabItem("策略仿真"):
+            with gr.Row():
+                with gr.Column():
+                    with gr.Row():
+                        project_name = gr.Textbox(label="项目名称", placeholder="请输入不重复的项目名称",
+                                                  info="项目名称将会被作为缓存目录名称,如果重复会导致结果被覆盖")
+
+                    with gr.Row():
+                        file_data = gr.File(label="策略文档", file_types=[".xlsx"], scale=3)
+                        sheet_name = gr.Dropdown(choices=["流程"], value="流程", label="策略查看", interactive=True,
+                                                 info="流程及节点信息查看", scale=1)
+
+                    with gr.Row():
+                        data_upload = gr.Dataframe(visible=False, label="当前上传数据", max_height=300)
+                    code_generate = gr.Button("生成代码", variant="primary")
+
+                    input_elems.update(
+                        {project_name, sheet_name, file_data, data_upload})
+                    elem_dict.update(dict(
+                        project_name=project_name,
+                        sheet_name=sheet_name,
+                        file_data=file_data,
+                        data_upload=data_upload,
+                        code_generate=code_generate
+                    ))
+                with gr.Column():
+                    with gr.Row():
+                        generate_progress = gr.Textbox(label="生成进度", scale=4)
+                        download_code = gr.DownloadButton(label="代码下载", variant="primary",
+                                                          visible=False, scale=1)
+                        file_report = gr.File(visible=False)
+                    with gr.Row():
+                        code_view = gr.Code()
+                    input_elems.update(
+                        {generate_progress, download_code, file_report})
+                    elem_dict.update(dict(
+                        generate_progress=generate_progress,
+                        download_report=download_code,
+                        file_report=file_report
+                    ))
+
+                    engine.add_elems(elem_dict)
+
+                    project_name.change(fn=f_project_is_exist, inputs=input_elems)
+                    sheet_name.change(fn=f_get_sheet_data, inputs=input_elems, outputs=[data_upload])
+                    file_data.upload(fn=f_data_upload, inputs=input_elems, outputs=[data_upload, sheet_name])
+                    code_generate.click(fn=f_code_generate, inputs=input_elems,
+                                        outputs=[generate_progress, code_view, download_code])
+                    download_code.click(fn=f_download_code, inputs=input_elems, outputs=download_code)
+
+            demo.queue(default_concurrency_limit=5)
+            demo.launch(share=False, show_error=True, server_name="0.0.0.0", server_port=18067)
+
+if __name__ == "__main__":
+    pass

+ 3 - 2
commom/__init__.py

@@ -6,6 +6,7 @@
 """
 from .llm_call import call_llm, f_file_upload
 from .user_exceptions import GeneralException
-from .utils import f_get_date, f_get_datetime, f_get_save_path
+from .utils import f_get_date, f_get_datetime, f_get_save_path, create_zip
 
-__all__ = ['GeneralException', 'f_get_date', 'f_get_datetime', 'f_get_save_path', 'call_llm', 'f_file_upload']
+__all__ = ['GeneralException', 'f_get_date', 'f_get_datetime', 'f_get_save_path', 'call_llm', 'f_file_upload',
+           'create_zip']

+ 6 - 0
commom/utils.py

@@ -7,6 +7,7 @@
 
 import datetime
 import os
+import zipfile
 
 import pytz
 
@@ -28,3 +29,8 @@ os.makedirs(base_dir, exist_ok=True)
 def f_get_save_path(file_name: str, sub_path=""):
     os.makedirs(os.path.join(base_dir, sub_path), exist_ok=True)
     return os.path.join(base_dir, sub_path, file_name)
+
+def create_zip(zip_name, files):
+    with zipfile.ZipFile(zip_name, 'w') as zipf:
+        for file in files:
+            zipf.write(file)

+ 3 - 0
config/base_config.py

@@ -4,6 +4,8 @@
 @time: 2022/10/24
 @desc: 
 """
+import os
+
 
 class BaseConfig:
     bot_url = "https://api.coze.cn/open_api/v2/chat"
@@ -11,6 +13,7 @@ class BaseConfig:
     token = "Bearer pat_HNBYQOWE5h4r1tzXi8S2PuY4ddoVRH3DpTbE3NsYBjtcWHTYw5ffrVmKPh26hSLW"
     bot_id = "7397344489205153807"
     file_upload_url = "https://api.coze.cn/v1/files/upload"
+    base_dir = os.path.join(".", "cache")
 
 
 

BIN
flow.png


+ 6 - 12
prompt/prompt.py

@@ -116,10 +116,10 @@ if __name__ == "__main__":
 """
 
 
-def f_get_prompt_parse_node(rules_name: str, rules: str, default_output:str):
-    return prompt_parse_node\
-        .replace("{rules_name}", rules_name)\
-        .replace("{rules}", rules)\
+def f_get_prompt_parse_node(rules_name: str, rules: str, default_output: str):
+    return prompt_parse_node \
+        .replace("{rules_name}", rules_name) \
+        .replace("{rules}", rules) \
         .replace("{default_output}", default_output)
 
 
@@ -135,14 +135,8 @@ def f_get_prompt_parse_flow_image(node_list: list):
         .replace("{node_func_map}", node_func_map) \
         .replace("{func_import}", func_import)
 
-def f_get_prompt_parse_flow(node_list: list, flow:str):
-    func = ""
-    node_func_map = ""
-    func_import = ""
-    for node_name, func_name, code in node_list:
-        func = f"{func}{code}\n\n"
-        node_func_map = f"{node_func_map}{node_name}: {func_name}\n"
-        func_import = f"{func_import}from {func_name} import {func_name}\n"
+
+def f_get_prompt_parse_flow(func: str, node_func_map: str, func_import: str, flow: str):
     return prompt_parse_flow.replace("{func}", func) \
         .replace("{node_func_map}", node_func_map) \
         .replace("{flow}", flow) \

+ 3 - 14
requirements.txt

@@ -1,15 +1,4 @@
-pymysql==1.0.2
-python-docx==0.8.11
 xlrd==1.2.0
-scorecardpy==0.1.9.7
-dataframe_image==0.1.14
-matplotlib==3.3.4
-numpy==1.19.5
-pandas==1.1.5
-scikit-learn==0.24.2
-pyhive==0.7.0
-thrift==0.21.0
-thrift-sasl==0.4.3
-seaborn==0.11.2
-contextvars==2.4
-tqdm==4.64.1
+gradio==5.8.0
+pandas==1.5.3
+tqdm==4.67.1

+ 185 - 132
strategy_parse.py

@@ -5,151 +5,204 @@
 @desc: 策略流节点解析
 """
 import json
+import os
 import re
 import time
-from tqdm import tqdm
+from typing import List
 
 import pandas as pd
 from PIL import Image
 from openpyxl import load_workbook
+from tqdm import tqdm
 
-from commom import f_get_save_path, call_llm, f_file_upload, GeneralException
+from commom import call_llm, f_file_upload, GeneralException, f_get_datetime, create_zip
+from config import BaseConfig
 from enums import ResultCodesEnum
 from prompt import f_get_prompt_parse_node, f_get_prompt_parse_flow, f_get_prompt_parse_flow_image
 
 
-def _f_parse_flow_image(ws, node_list: list):
-    image = ws._images[0]
-    img = Image.open(image.ref).convert("RGB")
-    save_path = f_get_save_path("流程图.png")
-    img.save(save_path)
-    time.sleep(1)
-    file_id = f_file_upload(save_path)
-    prompt = f_get_prompt_parse_flow_image(node_list)
-    print(prompt)
-    prompt = [
-        {
-            "type": "text",
-            "text": prompt
-        },
-        {
-            "type": "image",
-            "file_id": file_id
-        }
-    ]
-    prompt = json.dumps(prompt, ensure_ascii=False)
-    llm_answer = call_llm(prompt, "object_string")
-    print(llm_answer)
-    code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
-    save_path = f_get_save_path("flow.py")
-    with open(save_path, mode="w", encoding="utf8") as f:
-        f.write(code)
-
-    save_path = f_get_save_path("__init__.py")
-    with open(save_path, mode="w", encoding="utf8") as f:
-        f.write("")
-
-
-def _f_parse_flow(node_list: list, df: pd.DataFrame):
-    flow = ""
-    for _, row in df.iterrows():
-        strategy = row["策略流描述"]
-        flow = f"{flow}{strategy}\n"
-    flow = flow.strip()
-
-    prompt = f_get_prompt_parse_flow(node_list, flow)
-    print(prompt)
-    llm_answer = call_llm(prompt)
-    print(llm_answer)
-    code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
-    save_path = f_get_save_path("flow.py")
-    with open(save_path, mode="w", encoding="utf8") as f:
-        f.write(code)
-
-    save_path = f_get_save_path("__init__.py")
-    with open(save_path, mode="w", encoding="utf8") as f:
-        f.write("")
-
-
-def _f_parse_node(df: pd.DataFrame, node_name):
-    rules = ""
-    for idx, row in df.iterrows():
-        var_name = row["变量"]
-        var_name = var_name.replace("\n", " ")
-
-        rule_content = row["逻辑"]
-        rule_content = rule_content.replace("\n", " ")
-
-        rule_out = row["输出"]
-
-        notes_output = row["输出备注"]
-        if notes_output is None or notes_output != notes_output:
-            notes_output = ""
-        else:
-            notes_output = notes_output.replace("\n", " ")
-            notes_output = f" 结果备注: {notes_output}"
-
-        notes_input = row["输入备注"]
-        if notes_input is None or notes_input != notes_input:
-            notes_input = ""
-        else:
-            notes_input = notes_input.replace("\n", " ")
-            notes_input = f" 变量备注: {notes_input}"
-
-        rules = f"{rules}规则{idx + 1}: 变量:{var_name} 逻辑:{rule_content} 输出:{rule_out}{notes_input}{notes_output}\n"
-
-    default_output = list(df["默认输出"])[0]
-    if default_output is None or default_output != default_output:
-        default_output = ""
-    else:
-        default_output = str(default_output).replace("\n", " ")
-        default_output = f"{default_output}"
-
-    prompt = f_get_prompt_parse_node(node_name, rules, default_output)
-    print(prompt)
-    llm_answer = call_llm(prompt)
-    code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
-    func_name = re.findall(r"def (.*)\(data", code)[0]
-    save_path = f_get_save_path(f"{func_name}.py")
-    print(code)
-    with open(save_path, mode="w", encoding="utf8") as f:
-        f.write(code)
-    return func_name, code
-
-
-def f_parse_strategy_image(file_path):
-    wb = load_workbook(file_path)
-    excel = pd.ExcelFile(file_path)
-    sheet_names = excel.sheet_names
-    if "流程图" not in sheet_names:
-        GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【流程图】不存在")
-    node_list = []
-    for node_name in tqdm(sheet_names):
-        if node_name == "流程图":
-            continue
-        df = excel.parse(sheet_name=node_name)
-        func_name, code = _f_parse_node(df, node_name)
-        node_list.append((node_name, func_name, code))
-    _f_parse_flow_image(wb["流程图"], node_list)
-    wb.close()
-    excel.close()
+class StrategyParse:
 
+    def __init__(self, project_name: str = None, *args, **kwargs):
+        # 项目名称,和缓存路径有关
+        self._project_name = project_name
 
-def f_parse_strategy(file_path):
-    excel = pd.ExcelFile(file_path)
-    sheet_names = excel.sheet_names
-    if "流程" not in sheet_names:
-        GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【流程】不存在")
-    node_list = []
-    for node_name in tqdm(sheet_names):
-        if node_name == "流程":
-            continue
-        df = excel.parse(sheet_name=node_name)
-        func_name, code = _f_parse_node(df, node_name)
-        node_list.append((node_name, func_name, code))
-    _f_parse_flow(node_list, excel.parse(sheet_name="流程"))
-    excel.close()
+        if self._project_name is None or len(self._project_name) == 0:
+            self._base_dir = os.path.join(BaseConfig.base_dir, f"{f_get_datetime()}")
+        else:
+            self._base_dir = os.path.join(BaseConfig.base_dir, self._project_name)
+        os.makedirs(self._base_dir, exist_ok=True)
+
+    @property
+    def project_name(self):
+        return self._project_name
+
+    @property
+    def base_dir(self):
+        return self._base_dir
+
+    def _f_get_save_path(self, file_name: str) -> str:
+        path = os.path.join(self._base_dir, file_name)
+        return path
+
+    def _f_get_py_files(self, ):
+        py_files = []
+        file_name_list: List[str] = os.listdir(self._base_dir)
+        for file_name in file_name_list:
+            if ".py" in file_name:
+                py_files.append(os.path.join(self._base_dir, file_name))
+        return py_files
+
+    def _f_parse_flow_image(self, ws, node_list: list):
+        image = ws._images[0]
+        img = Image.open(image.ref).convert("RGB")
+        save_path = self._f_get_save_path("流程图.png")
+        img.save(save_path)
+        time.sleep(1)
+        file_id = f_file_upload(save_path)
+        prompt = f_get_prompt_parse_flow_image(node_list)
+        print(prompt)
+        prompt = [
+            {
+                "type": "text",
+                "text": prompt
+            },
+            {
+                "type": "image",
+                "file_id": file_id
+            }
+        ]
+        prompt = json.dumps(prompt, ensure_ascii=False)
+        llm_answer = call_llm(prompt, "object_string")
+        print(llm_answer)
+        code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
+        save_path = self._f_get_save_path("flow.py")
+        with open(save_path, mode="w", encoding="utf8") as f:
+            f.write(code)
+
+        save_path = self._f_get_save_path("__init__.py")
+        with open(save_path, mode="w", encoding="utf8") as f:
+            f.write("")
+
+    def _f_parse_flow(self, node_list: list, df: pd.DataFrame):
+
+        node_func_dict = {"流程": "flow.py"}
+        func = ""
+        node_func_map = ""
+        func_import = ""
+        for node_name, func_name, code in node_list:
+            node_func_dict[node_name] = f"{func_name}.py"
+            func = f"{func}{code}\n\n"
+            node_func_map = f"{node_func_map}{node_name}: {func_name}\n"
+            func_import = f"{func_import}from {func_name} import {func_name}\n"
+
+        save_path = self._f_get_save_path("node_func_dict.json")
+        with open(save_path, mode="w", encoding="utf8") as f:
+            f.write(json.dumps(node_func_dict, ensure_ascii=False))
+
+        flow = ""
+        for _, row in df.iterrows():
+            strategy = row["策略流描述"]
+            flow = f"{flow}{strategy}\n"
+        flow = flow.strip()
+
+        prompt = f_get_prompt_parse_flow(func, node_func_map, func_import, flow)
+        print(prompt)
+        llm_answer = call_llm(prompt)
+        print(llm_answer)
+        code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
+        save_path = self._f_get_save_path("flow.py")
+        with open(save_path, mode="w", encoding="utf8") as f:
+            f.write(code)
+
+        save_path = self._f_get_save_path("__init__.py")
+        with open(save_path, mode="w", encoding="utf8") as f:
+            f.write("")
+
+    def _f_parse_node(self, df: pd.DataFrame, node_name):
+        rules = ""
+        for idx, row in df.iterrows():
+            var_name = row["变量"]
+            var_name = var_name.replace("\n", " ")
+
+            rule_content = row["逻辑"]
+            rule_content = rule_content.replace("\n", " ")
+
+            rule_out = row["输出"]
+
+            notes_output = row["输出备注"]
+            if notes_output is None or notes_output != notes_output:
+                notes_output = ""
+            else:
+                notes_output = notes_output.replace("\n", " ")
+                notes_output = f" 结果备注: {notes_output}"
+
+            notes_input = row["输入备注"]
+            if notes_input is None or notes_input != notes_input:
+                notes_input = ""
+            else:
+                notes_input = notes_input.replace("\n", " ")
+                notes_input = f" 变量备注: {notes_input}"
+
+            rules = f"{rules}规则{idx + 1}: 变量:{var_name} 逻辑:{rule_content} 输出:{rule_out}{notes_input}{notes_output}\n"
+
+        default_output = list(df["默认输出"])[0]
+        if default_output is None or default_output != default_output:
+            default_output = ""
+        else:
+            default_output = str(default_output).replace("\n", " ")
+            default_output = f"{default_output}"
+
+        prompt = f_get_prompt_parse_node(node_name, rules, default_output)
+        print(prompt)
+        llm_answer = call_llm(prompt)
+        code = re.findall(r"```python\n(.*)\n```", llm_answer, flags=re.DOTALL)[0]
+        func_name = re.findall(r"def (.*)\(data", code)[0]
+        save_path = self._f_get_save_path(f"{func_name}.py")
+        print(code)
+        with open(save_path, mode="w", encoding="utf8") as f:
+            f.write(code)
+        return func_name, code
+
+    def _f_parse_strategy_image(self, file_path):
+        wb = load_workbook(file_path)
+        excel = pd.ExcelFile(file_path)
+        sheet_names = excel.sheet_names
+        if "流程图" not in sheet_names:
+            GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【流程图】不存在")
+        node_list = []
+        for node_name in tqdm(sheet_names):
+            if node_name == "流程图":
+                continue
+            df = excel.parse(sheet_name=node_name)
+            func_name, code = self._f_parse_node(df, node_name)
+            node_list.append((node_name, func_name, code))
+        self._f_parse_flow_image(wb["流程图"], node_list)
+        wb.close()
+        excel.close()
+
+    def f_parse_strategy(self, excel: pd.ExcelFile, progress=None):
+        sheet_names = excel.sheet_names
+        if "流程" not in sheet_names:
+            GeneralException(ResultCodesEnum.NOT_FOUND, message=f"sheet【流程】不存在")
+        node_list = []
+        for node_name in tqdm(sheet_names):
+            if node_name == "流程":
+                continue
+            df = excel.parse(sheet_name=node_name)
+            func_name, code = self._f_parse_node(df, node_name)
+            node_list.append((node_name, func_name, code))
+        if progress is not None:
+            progress(0.9)
+        self._f_parse_flow(node_list, excel.parse(sheet_name="流程"))
+
+        save_path = self._f_get_save_path("code.zip")
+        py_files = self._f_get_py_files()
+        create_zip(save_path, py_files)
 
 
 if __name__ == "__main__":
-    f_parse_strategy("./cache/策略节点配置3demo.xlsx")
+    excel = pd.ExcelFile("./cache/策略节点配置3demo.xlsx")
+    strategy_parse = StrategyParse()
+    strategy_parse.f_parse_strategy(excel)
+    excel.close()

+ 11 - 0
webui/__init__.py

@@ -0,0 +1,11 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/12/5
+@desc:
+
+"""
+from .manager import engine
+from .utils import f_project_is_exist, f_data_upload, f_download_code, f_code_generate, f_get_sheet_data
+
+__all__ = ['engine', 'f_project_is_exist', 'f_data_upload', 'f_code_generate', 'f_download_code', 'f_get_sheet_data']

+ 32 - 0
webui/manager.py

@@ -0,0 +1,32 @@
+from typing import Dict
+
+from gradio.components import Component
+
+
+class Manager:
+    def __init__(self) -> None:
+        self._id_to_elem: Dict[str, "Component"] = {}
+        self._elem_to_id: Dict["Component", str] = {}
+
+    def add_elems(self, elem_dict: Dict[str, "Component"]) -> None:
+        for elem_id, elem in elem_dict.items():
+            self._id_to_elem[elem_id] = elem
+            self._elem_to_id[elem] = elem_id
+
+    def get_elem_by_id(self, elem_id: str) -> "Component":
+        return self._id_to_elem[elem_id]
+
+    def _get_id_by_elem(self, elem: "Component") -> str:
+        return self._elem_to_id[elem]
+
+    def get(self, data, key):
+        return data[self.get_elem_by_id(key)]
+
+    def get_all(self, data) -> Dict:
+        all = {}
+        for k, v in self._id_to_elem.items():
+            all[k] = data[v]
+        return all
+
+
+engine = Manager()

+ 158 - 0
webui/utils.py

@@ -0,0 +1,158 @@
+# -*- coding: utf-8 -*-
+"""
+@author: yq
+@time: 2024/12/5
+@desc: 
+"""
+import os
+import shutil
+from typing import List
+
+import gradio as gr
+import pandas as pd
+
+from config import BaseConfig
+from strategy_parse import StrategyParse
+from .manager import engine
+
+DATA_SUB_DIR = "data"
+UPLOAD_DATA_PREFIX = "prefix_upload_data_"
+
+
+def _clean_base_dir(data):
+    base_dir = _get_base_dir(data)
+    file_name_list: List[str] = os.listdir(base_dir)
+    for file_name in file_name_list:
+        if file_name in [DATA_SUB_DIR]:
+            continue
+        file_path = os.path.join(base_dir, file_name)
+        if os.path.isdir(file_path):
+            shutil.rmtree(file_path)
+        else:
+            os.remove(file_path)
+
+
+def _check_save_dir(data):
+    project_name = engine.get(data, "project_name")
+    if project_name is None or len(project_name) == 0:
+        raise gr.Error(message='项目名称不能为空', duration=5)
+    return True
+
+
+def _get_prefix_file(save_path, prefix):
+    file_name_list: List[str] = os.listdir(save_path)
+    for file_name in file_name_list:
+        if prefix in file_name:
+            return os.path.join(save_path, file_name)
+
+
+def _get_base_dir(data):
+    project_name = engine.get(data, "project_name")
+    base_dir = os.path.join(BaseConfig.base_dir, project_name)
+    return base_dir
+
+
+def _get_upload_data(data) -> pd.ExcelFile:
+    base_dir = _get_base_dir(data)
+    save_path = os.path.join(base_dir, DATA_SUB_DIR)
+    file_path = _get_prefix_file(save_path, UPLOAD_DATA_PREFIX)
+    excel = pd.ExcelFile(file_path)
+    return excel
+
+
+def f_project_is_exist(data):
+    project_name = engine.get(data, "project_name")
+    if project_name is None or len(project_name) == 0:
+        gr.Warning(message='项目名称不能为空', duration=5)
+    elif os.path.exists(_get_base_dir(data)):
+        gr.Warning(message='项目名称已被使用', duration=5)
+
+
+def _get_save_path(data, file_name: str, sub_dir="", name_prefix=""):
+    base_dir = _get_base_dir(data)
+    save_path = os.path.join(base_dir, sub_dir)
+    os.makedirs(save_path, exist_ok=True)
+    # 有前缀标示的先删除
+    if name_prefix:
+        file = _get_prefix_file(save_path, name_prefix)
+        if file:
+            os.remove(file)
+    save_path = os.path.join(save_path, name_prefix + os.path.basename(file_name))
+    return save_path
+
+
+def f_data_upload(data):
+    if not _check_save_dir(data):
+        return
+    file_data = engine.get(data, "file_data")
+    data_path = _get_save_path(data, file_data.name, DATA_SUB_DIR, UPLOAD_DATA_PREFIX)
+    shutil.copy(file_data.name, data_path)
+    excel = _get_upload_data(data)
+    df = excel.parse(sheet_name="流程")
+    columns = excel.sheet_names
+    excel.close()
+    return {
+        engine.get_elem_by_id("data_upload"): gr.update(value=df, visible=True),
+        engine.get_elem_by_id("sheet_name"): gr.update(choices=columns),
+    }
+
+
+def f_get_sheet_data(data):
+    sheet_name = engine.get(data, "sheet_name")
+    excel = _get_upload_data(data)
+    df = excel.parse(sheet_name=sheet_name)
+    excel.close()
+    return {
+        engine.get_elem_by_id("data_upload"): gr.update(value=df, visible=True)
+    }
+
+
+def f_download_code(data):
+    file_path = _get_save_path(data, "code.zip")
+    if os.path.exists(file_path):
+        return {engine.get_elem_by_id("download_code"): gr.update(value=file_path)}
+    else:
+        raise FileNotFoundError(f"{file_path} not found.")
+
+
+def f_verify_param(data):
+    excel = _get_upload_data(data)
+    columns = excel.sheet_names
+    excel.close()
+    if "流程" not in columns:
+        raise gr.Error(message=f'【流程】sheet不能为空', duration=5)
+    return True
+
+
+def f_code_generate(data, progress=gr.Progress(track_tqdm=True)):
+    def _reset_component_state():
+        return {engine.get_elem_by_id("download_code"): gr.update(visible=False),
+                engine.get_elem_by_id("code_view"): gr.update(visible=False)
+                }
+
+    progress(0, desc="Starting")
+
+    all_param = engine.get_all(data)
+
+    # 清空储存目录
+    _clean_base_dir(data)
+    # 校验参数
+    if not f_verify_param(data):
+        yield _reset_component_state()
+
+    yield _reset_component_state()
+
+    progress(0.01)
+    excel = _get_upload_data(data)
+
+    strategy_parse = StrategyParse(**all_param)
+    strategy_parse.f_parse_strategy(excel, progress)
+
+    excel.close()
+
+    code_file_path = _get_save_path(data, "code.zip")
+
+    progress(1)
+
+    yield {engine.get_elem_by_id("generate_progress"): gr.update(value="生成完成"),
+           engine.get_elem_by_id("download_code"): gr.update(value=code_file_path, visible=True)}