# -*- coding:utf-8 -*- """ @author: isaacqyang @time: 2023/12/28 @desc: """ import json import os import time from urllib.parse import unquote from docx import Document from docx.enum.text import WD_ALIGN_PARAGRAPH import lark_oapi as lark import tos from lark_oapi.api.drive.v1 import CreateExportTaskRequest, ExportTask, CreateExportTaskResponse, GetExportTaskRequest, \ GetExportTaskResponse, DownloadExportTaskRequest, DownloadExportTaskResponse from tos import HttpMethodType from config import BaseConfig def f_upload_file(save_path) -> str: ak = BaseConfig.cos_access_key_id sk = BaseConfig.cos_secret_access_key endpoint = BaseConfig.endpoint region = BaseConfig.region bucket_name = BaseConfig.bucket_name try: # 创建 TosClientV2 对象,对桶和对象的操作都通过 TosClientV2 实现 client = tos.TosClientV2(ak, sk, endpoint, region) object_key = os.path.basename(save_path) client.put_object_from_file(bucket_name, object_key, save_path) pre_signed_url_output = client.pre_signed_url(HttpMethodType.Http_Method_Get, bucket_name, object_key) return pre_signed_url_output.signed_url except tos.exceptions.TosClientError as e: # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常 print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause)) except tos.exceptions.TosServerError as e: # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息 print('fail with server error, code: {}'.format(e.code)) # request id 可定位具体问题,强烈建议日志中保存 print('error with request id: {}'.format(e.request_id)) print('error with message: {}'.format(e.message)) print('error with http code: {}'.format(e.status_code)) print('error with ec: {}'.format(e.ec)) print('error with request url: {}'.format(e.request_url)) except Exception as e: print('fail with unknown error: {}'.format(e)) def create_word_table(json_data): # 将JSON字符串解析为Python对象 json_data = json.loads(json_data) # 创建 Word 文档对象 document = Document() # 创建表格 table = document.add_table(rows=len(json_data['data']), cols=len(json_data['data'][0])) # 填充表格数据 for i, row in enumerate(json_data['data']): for j, cell_value in enumerate(row): cell = table.cell(i, j) cell.text = cell_value.strip() # 去除单元格文本前后的空白字符 # 设置表格样式 table.style = 'Table Grid' for row in table.rows: for cell in row.cells: cell.paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER # 合并单元格并处理换行问题 for merge in json_data.get('merges', []): # 增加空值处理 start_cell = table.cell(merge['start_row'], merge['start_column']) end_cell = table.cell(merge['end_row'], merge['end_column']) start_cell.merge(end_cell) # 合并后,将所有文本合并到一个段落中 all_text = "" for paragraph in start_cell.paragraphs: all_text += paragraph.text # 清除原有段落 for paragraph in start_cell.paragraphs: p = paragraph._element p.getparent().remove(p) p._p = p._element = None # 添加一个新的段落,包含所有文本 start_cell.add_paragraph(all_text) return table def f_doc_export(token: str, request_id: str, data: object) -> str: # 飞书在线文档转word app_id = BaseConfig.app_id app_secret = BaseConfig.app_secret word_save_dir = BaseConfig.word_save_dir client = lark.Client.builder() \ .app_id(app_id) \ .app_secret(app_secret) \ .log_level(lark.LogLevel.DEBUG) \ .build() # 构造请求对象 request1: CreateExportTaskRequest = CreateExportTaskRequest.builder() \ .request_body(ExportTask.builder() .file_extension("docx") .token(token) .type("docx") .build()) \ .build() # 发起请求 response1: CreateExportTaskResponse = client.drive.v1.export_task.create(request1) # 处理失败返回 if not response1.success(): lark.logger.error( f"client.drive.v1.export_task.create failed, code: {response1.code}, msg: {response1.msg}, log_id: {response1.get_log_id()}, resp: \n{json.dumps(json.loads(response1.raw.content), indent=4, ensure_ascii=False)}") return # 处理业务结果 lark.logger.info(lark.JSON.marshal(response1.data, indent=4)) ticket = response1.data.ticket time.sleep(5) # 构造请求对象 request2: GetExportTaskRequest = GetExportTaskRequest.builder() \ .ticket(ticket) \ .token(token) \ .build() # 发起请求 response2: GetExportTaskResponse = client.drive.v1.export_task.get(request2) # 处理失败返回 if not response2.success(): lark.logger.error( f"client.drive.v1.export_task.get failed, code: {response2.code}, msg: {response2.msg}, log_id: {response2.get_log_id()}, resp: \n{json.dumps(json.loads(response2.raw.content), indent=4, ensure_ascii=False)}") return # 处理业务结果 lark.logger.info(lark.JSON.marshal(response2.data, indent=4)) file_token = response2.data.result.file_token # 构造请求对象 request3: DownloadExportTaskRequest = DownloadExportTaskRequest.builder() \ .file_token(file_token) \ .build() # 发起请求 response3: DownloadExportTaskResponse = client.drive.v1.export_task.download(request3) # 处理失败返回 if not response3.success(): lark.logger.error( f"client.drive.v1.export_task.download failed, code: {response3.code}, msg: {response3.msg}, log_id: {response3.get_log_id()}") return # 处理业务结果 file_name = unquote(response3.file_name) save_path = os.path.join(word_save_dir, file_name) with open(save_path, "wb") as f: f.write(response3.file.read()) time.sleep(2) # # 操作word # if data is not None: # doc = Document(save_path) # placeholder = "{TABLE_PLACEHOLDER}" # for paragraph in doc.paragraphs: # if not placeholder in paragraph.text: # continue # # 清除占位符 # for run in paragraph.runs: # run.text = run.text.replace(placeholder, "") # # 生成表格(调用改造后的 create_word_table 函数,传入字符串) # table = create_word_table(data) # paragraph._element.addnext(table._tbl) # doc.save(save_path) # time.sleep(2) # # doc = Document(save_path) # placeholder_base = "{TABLE_PLACEHOLDER}" # 基础占位符前缀 # # table_datas = data # # # 遍历所有段落,按索引匹配占位符 # for idx, paragraph in enumerate(doc.paragraphs): # # 构造当前占位符(如{TABLE_PLACEHOLDER}_1, _2, _3...) # current_placeholder = f"{placeholder_base}_{idx + 1}" # # if current_placeholder in paragraph.text: # # 检查是否有对应索引的表格数据 # if idx < len(data): # table_json = data[idx] # 获取第idx+1个表格数据 # # 生成表格(假设create_word_table接收JSON字符串或字典) # if isinstance(table_json, str): # table = create_word_table(table_json) # 传入JSON字符串 # else: # table = create_word_table(json.dumps(table_json)) # 传入字典需转为字符串 # # # 清除占位符文本 # for run in paragraph.runs: # run.text = run.text.replace(current_placeholder, "") # # # 插入表格到占位符位置 # paragraph._element.addnext(table._tbl) # else: # print(f"警告:占位符{current_placeholder}无对应表格数据") # # doc.save(save_path) # time.sleep(2) doc = Document(save_path) placeholder_prefix = "{TABLE_PLACEHOLDER}_" placeholder_count = len(data) for i in range(1, placeholder_count + 1): placeholder = f"{placeholder_prefix}{i}" for paragraph in doc.paragraphs: if placeholder in paragraph.text: # 清除占位符 for run in paragraph.runs: run.text = run.text.replace(placeholder, "") # 生成表格(调用改造后的 create_word_table 函数,传入字符串) table_data = data[i-1] table = create_word_table(table_data) paragraph._element.addnext(table._tbl) break # 找到并处理一个占位符后,跳出内层循环 doc.save(save_path) time.sleep(2) word_download_url = f_upload_file(save_path) return word_download_url if __name__ == "__main__": f_doc_export('YKNBdbs10oA3pCxTdnAczcvOnxc') # f_upload_file("/root/project/coze_znjd/大模型企业调查报告/1.docx")