123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # -*- coding:utf-8 -*-
- """
- @author: isaacqyang
- @time: 2023/12/28
- @desc:
- """
- import json
- import os
- import time
- from urllib.parse import unquote
- from docx import Document
- from docx.enum.text import WD_ALIGN_PARAGRAPH
- import lark_oapi as lark
- import tos
- from lark_oapi.api.drive.v1 import CreateExportTaskRequest, ExportTask, CreateExportTaskResponse, GetExportTaskRequest, \
- GetExportTaskResponse, DownloadExportTaskRequest, DownloadExportTaskResponse
- from tos import HttpMethodType
- from config import BaseConfig
- def f_upload_file(save_path) -> str:
- ak = BaseConfig.cos_access_key_id
- sk = BaseConfig.cos_secret_access_key
- endpoint = BaseConfig.endpoint
- region = BaseConfig.region
- bucket_name = BaseConfig.bucket_name
- try:
- # 创建 TosClientV2 对象,对桶和对象的操作都通过 TosClientV2 实现
- client = tos.TosClientV2(ak, sk, endpoint, region)
- object_key = os.path.basename(save_path)
- client.put_object_from_file(bucket_name, object_key, save_path)
- pre_signed_url_output = client.pre_signed_url(HttpMethodType.Http_Method_Get, bucket_name, object_key)
- return pre_signed_url_output.signed_url
- except tos.exceptions.TosClientError as e:
- # 操作失败,捕获客户端异常,一般情况为非法请求参数或网络异常
- print('fail with client error, message:{}, cause: {}'.format(e.message, e.cause))
- except tos.exceptions.TosServerError as e:
- # 操作失败,捕获服务端异常,可从返回信息中获取详细错误信息
- print('fail with server error, code: {}'.format(e.code))
- # request id 可定位具体问题,强烈建议日志中保存
- print('error with request id: {}'.format(e.request_id))
- print('error with message: {}'.format(e.message))
- print('error with http code: {}'.format(e.status_code))
- print('error with ec: {}'.format(e.ec))
- print('error with request url: {}'.format(e.request_url))
- except Exception as e:
- print('fail with unknown error: {}'.format(e))
- def create_word_table(json_data):
- # 将JSON字符串解析为Python对象
- json_data = json.loads(json_data)
- # 创建 Word 文档对象
- document = Document()
- # 创建表格
- table = document.add_table(rows=len(json_data['data']), cols=len(json_data['data'][0]))
- # 填充表格数据
- for i, row in enumerate(json_data['data']):
- for j, cell_value in enumerate(row):
- cell = table.cell(i, j)
- cell.text = cell_value.strip() # 去除单元格文本前后的空白字符
- # 设置表格样式
- table.style = 'Table Grid'
- for row in table.rows:
- for cell in row.cells:
- cell.paragraphs[0].alignment = WD_ALIGN_PARAGRAPH.CENTER
- # 合并单元格并处理换行问题
- for merge in json_data.get('merges', []): # 增加空值处理
- start_cell = table.cell(merge['start_row'], merge['start_column'])
- end_cell = table.cell(merge['end_row'], merge['end_column'])
- start_cell.merge(end_cell)
- # 合并后,将所有文本合并到一个段落中
- all_text = ""
- for paragraph in start_cell.paragraphs:
- all_text += paragraph.text
- # 清除原有段落
- for paragraph in start_cell.paragraphs:
- p = paragraph._element
- p.getparent().remove(p)
- p._p = p._element = None
- # 添加一个新的段落,包含所有文本
- start_cell.add_paragraph(all_text)
- return table
- def f_doc_export(token: str, request_id: str, data: object) -> str:
- # 飞书在线文档转word
- app_id = BaseConfig.app_id
- app_secret = BaseConfig.app_secret
- word_save_dir = BaseConfig.word_save_dir
- client = lark.Client.builder() \
- .app_id(app_id) \
- .app_secret(app_secret) \
- .log_level(lark.LogLevel.DEBUG) \
- .build()
- # 构造请求对象
- request1: CreateExportTaskRequest = CreateExportTaskRequest.builder() \
- .request_body(ExportTask.builder()
- .file_extension("docx")
- .token(token)
- .type("docx")
- .build()) \
- .build()
- # 发起请求
- response1: CreateExportTaskResponse = client.drive.v1.export_task.create(request1)
- # 处理失败返回
- if not response1.success():
- lark.logger.error(
- f"client.drive.v1.export_task.create failed, code: {response1.code}, msg: {response1.msg}, log_id: {response1.get_log_id()}, resp: \n{json.dumps(json.loads(response1.raw.content), indent=4, ensure_ascii=False)}")
- return
- # 处理业务结果
- lark.logger.info(lark.JSON.marshal(response1.data, indent=4))
- ticket = response1.data.ticket
- time.sleep(5)
- # 构造请求对象
- request2: GetExportTaskRequest = GetExportTaskRequest.builder() \
- .ticket(ticket) \
- .token(token) \
- .build()
- # 发起请求
- response2: GetExportTaskResponse = client.drive.v1.export_task.get(request2)
- # 处理失败返回
- if not response2.success():
- lark.logger.error(
- f"client.drive.v1.export_task.get failed, code: {response2.code}, msg: {response2.msg}, log_id: {response2.get_log_id()}, resp: \n{json.dumps(json.loads(response2.raw.content), indent=4, ensure_ascii=False)}")
- return
- # 处理业务结果
- lark.logger.info(lark.JSON.marshal(response2.data, indent=4))
- file_token = response2.data.result.file_token
- # 构造请求对象
- request3: DownloadExportTaskRequest = DownloadExportTaskRequest.builder() \
- .file_token(file_token) \
- .build()
- # 发起请求
- response3: DownloadExportTaskResponse = client.drive.v1.export_task.download(request3)
- # 处理失败返回
- if not response3.success():
- lark.logger.error(
- f"client.drive.v1.export_task.download failed, code: {response3.code}, msg: {response3.msg}, log_id: {response3.get_log_id()}")
- return
- # 处理业务结果
- file_name = unquote(response3.file_name)
- save_path = os.path.join(word_save_dir, file_name)
- with open(save_path, "wb") as f:
- f.write(response3.file.read())
- time.sleep(2)
- # # 操作word
- # if data is not None:
- # doc = Document(save_path)
- # placeholder = "{TABLE_PLACEHOLDER}"
- # for paragraph in doc.paragraphs:
- # if not placeholder in paragraph.text:
- # continue
- # # 清除占位符
- # for run in paragraph.runs:
- # run.text = run.text.replace(placeholder, "")
- # # 生成表格(调用改造后的 create_word_table 函数,传入字符串)
- # table = create_word_table(data)
- # paragraph._element.addnext(table._tbl)
- # doc.save(save_path)
- # time.sleep(2)
- #
- # doc = Document(save_path)
- # placeholder_base = "{TABLE_PLACEHOLDER}" # 基础占位符前缀
- # # table_datas = data
- #
- # # 遍历所有段落,按索引匹配占位符
- # for idx, paragraph in enumerate(doc.paragraphs):
- # # 构造当前占位符(如{TABLE_PLACEHOLDER}_1, _2, _3...)
- # current_placeholder = f"{placeholder_base}_{idx + 1}"
- #
- # if current_placeholder in paragraph.text:
- # # 检查是否有对应索引的表格数据
- # if idx < len(data):
- # table_json = data[idx] # 获取第idx+1个表格数据
- # # 生成表格(假设create_word_table接收JSON字符串或字典)
- # if isinstance(table_json, str):
- # table = create_word_table(table_json) # 传入JSON字符串
- # else:
- # table = create_word_table(json.dumps(table_json)) # 传入字典需转为字符串
- #
- # # 清除占位符文本
- # for run in paragraph.runs:
- # run.text = run.text.replace(current_placeholder, "")
- #
- # # 插入表格到占位符位置
- # paragraph._element.addnext(table._tbl)
- # else:
- # print(f"警告:占位符{current_placeholder}无对应表格数据")
- #
- # doc.save(save_path)
- # time.sleep(2)
- doc = Document(save_path)
- placeholder_prefix = "{TABLE_PLACEHOLDER}_"
- placeholder_count = len(data)
- for i in range(1, placeholder_count + 1):
- placeholder = f"{placeholder_prefix}{i}"
- for paragraph in doc.paragraphs:
- if placeholder in paragraph.text:
- # 清除占位符
- for run in paragraph.runs:
- run.text = run.text.replace(placeholder, "")
- # 生成表格(调用改造后的 create_word_table 函数,传入字符串)
- table_data = data[i-1]
- table = create_word_table(table_data)
- paragraph._element.addnext(table._tbl)
- break # 找到并处理一个占位符后,跳出内层循环
- doc.save(save_path)
- time.sleep(2)
- word_download_url = f_upload_file(save_path)
- return word_download_url
- if __name__ == "__main__":
- f_doc_export('YKNBdbs10oA3pCxTdnAczcvOnxc')
- # f_upload_file("/root/project/coze_znjd/大模型企业调查报告/1.docx")
|