123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- # -*- coding:utf-8 -*-
- """
- @author: isaacqyang
- @time: 2023/12/28
- @desc:
- """
- import json
- import os
- import time
- import lark_oapi as lark
- from lark_oapi.api.drive.v1 import CreateExportTaskRequest, ExportTask, CreateExportTaskResponse, GetExportTaskRequest, \
- GetExportTaskResponse, DownloadExportTaskRequest, DownloadExportTaskResponse
- from config import BaseConfig
- def f_doc_export(token:str) -> str:
- # 飞书在线文档转word
- app_id = BaseConfig.app_id
- app_secret = BaseConfig.app_secret
- word_save_dir = BaseConfig.word_save_dir
- client = lark.Client.builder() \
- .app_id(app_id) \
- .app_secret(app_secret) \
- .log_level(lark.LogLevel.DEBUG) \
- .build()
- # 构造请求对象
- request1: CreateExportTaskRequest = CreateExportTaskRequest.builder() \
- .request_body(ExportTask.builder()
- .file_extension("docx")
- .token(token)
- .type("docx")
- .build()) \
- .build()
- # 发起请求
- response1: CreateExportTaskResponse = client.drive.v1.export_task.create(request1)
- # 处理失败返回
- if not response1.success():
- lark.logger.error(
- f"client.drive.v1.export_task.create failed, code: {response1.code}, msg: {response1.msg}, log_id: {response1.get_log_id()}, resp: \n{json.dumps(json.loads(response1.raw.content), indent=4, ensure_ascii=False)}")
- return
- # 处理业务结果
- lark.logger.info(lark.JSON.marshal(response1.data, indent=4))
- ticket = response1.data.ticket
- time.sleep(5)
- # 构造请求对象
- request2: GetExportTaskRequest = GetExportTaskRequest.builder() \
- .ticket(ticket) \
- .token(token) \
- .build()
- # 发起请求
- response2: GetExportTaskResponse = client.drive.v1.export_task.get(request2)
- # 处理失败返回
- if not response2.success():
- lark.logger.error(
- f"client.drive.v1.export_task.get failed, code: {response2.code}, msg: {response2.msg}, log_id: {response2.get_log_id()}, resp: \n{json.dumps(json.loads(response2.raw.content), indent=4, ensure_ascii=False)}")
- return
- # 处理业务结果
- lark.logger.info(lark.JSON.marshal(response2.data, indent=4))
- file_token = response2.data.result.file_token
- # 构造请求对象
- request3: DownloadExportTaskRequest = DownloadExportTaskRequest.builder() \
- .file_token(file_token) \
- .build()
- # 发起请求
- response3: DownloadExportTaskResponse = client.drive.v1.export_task.download(request3)
- # 处理失败返回
- if not response3.success():
- lark.logger.error(
- f"client.drive.v1.export_task.download failed, code: {response3.code}, msg: {response3.msg}, log_id: {response3.get_log_id()}")
- return
- # 处理业务结果
- save_path = os.path.join(word_save_dir, response3.file_name)
- with open(save_path, "wb") as f:
- f.write(response3.file.read())
- return save_path
- if __name__ == "__main__":
- f_doc_export('YKNBdbs10oA3pCxTdnAczcvOnxc')
|