coze_znjd_api.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. -----------------File Info-----------------------
  5. Name: web.py
  6. Description: web api support
  7. Author: GentleCP
  8. Email: me@gentlecp.com
  9. Create Date: 2021/6/19
  10. -----------------End-----------------------------
  11. """
  12. import json
  13. import re
  14. import sys
  15. import threading
  16. import time
  17. import uuid
  18. from xml.etree.ElementTree import fromstring
  19. import requests
  20. import uvicorn
  21. from fastapi import FastAPI, Response, Request, BackgroundTasks, Body
  22. from WXBizMsgCrypt3 import WXBizMsgCrypt
  23. from commom import get_logger, request_id_context
  24. logger = get_logger()
  25. # 加载配置文件
  26. with open('znjd_config.json', 'r') as f:
  27. config = json.load(f)
  28. # 从配置文件中提取参数
  29. token = config['token']
  30. aeskey = config['aeskey']
  31. corpid = config['corpid']
  32. corpsecret = config['corpsecret']
  33. coze_access_token = config['coze_access_token']
  34. bot_id = config['bot_id']
  35. #port = config['port']
  36. # token = "EcSp"#企业微信应用api信息
  37. # aeskey = "OTZoY8N67kOnGosEpS3jw4Rsjea0Gu6D7X4IWxoYKtY"#企业微信应用api信息
  38. # corpid = "ww5541cfeea51e3188"#企业id
  39. # corpsecret = "SbyG25s1LsMsW0nAMiaNprrQIHYrWKQP4f2mNLLDnwE"##api成功后的secret
  40. # coze_access_token = "pat_HNBYQOWE5h4r1tzXi8S2PuY4ddoVRH3DpTbE3NsYBjtcWHTYw5ffrVmKPh26hSLW"#豆包access_token
  41. # bot_id="7397619068440182793"#豆包机器人id
  42. # port = 18090#服务器端口
  43. wxcpt = WXBizMsgCrypt(token, aeskey, corpid)
  44. app = FastAPI()
  45. # def call_llm(prompt: str, bot_id: str,coze_access_token:str):
  46. # req_head = {
  47. # "Authorization":f"Bearer {coze_access_token}",
  48. # "Content-Type": "application/json",
  49. # }
  50. # req_data = {
  51. # "conversation_id": "123",
  52. # "bot_id": bot_id,
  53. # "user": "test",
  54. # "query": prompt,
  55. # "stream": False
  56. # }
  57. # res = requests.post("https://api.coze.cn/open_api/v2/chat", headers=req_head, json=req_data)
  58. # res.raise_for_status() # 检查响应状态码是否为200
  59. # return res.json()
  60. def call_llm(user_query: str, bot_id: str,coze_access_token:str,company_name:str):
  61. req_head = {
  62. "Authorization":f"Bearer {coze_access_token}",
  63. "Content-Type": "application/json",
  64. }
  65. req_data ={
  66. "bot_id": bot_id,
  67. "user_id": "123456789",
  68. "stream": False,
  69. "auto_save_history": True,
  70. "additional_messages": [
  71. {
  72. "role": "user",
  73. "content": user_query,
  74. "content_type": "text"
  75. }
  76. ]
  77. }
  78. res_create = requests.post(" https://api.coze.cn/v1/conversation/create", headers=req_head)
  79. conversation_id = res_create.json()["data"]["id"]
  80. res_chat = requests.post(f" https://api.coze.cn/v3/chat?conversation_id={conversation_id}", headers=req_head,json=req_data)
  81. chat_id = res_chat.json()["data"]["id"]
  82. while True:
  83. res_retrieve = requests.get(f" https://api.coze.cn/v3/chat/retrieve?chat_id={chat_id}&conversation_id={conversation_id}", headers=req_head)
  84. res_json = res_retrieve.json()
  85. # 首先判断网络状态是否为200
  86. if res_retrieve.status_code != 200:
  87. logger.error(f"网络状态码失败,错误码:{res_retrieve.status_code }")
  88. coze_response = f"网络状态码失败,错误码:{res_retrieve.status_code }"
  89. url = ""
  90. return coze_response,company_name,url
  91. # 判断状态码是否为0
  92. if res_json["code"] != 0 :
  93. logger.error(f"API调用失败,错误码:{res_json['code']}")
  94. coze_response = f"API调用失败,错误码:{res_json['code']}"
  95. url = ""
  96. return coze_response,company_name,url
  97. # 打印并记录状态
  98. logger.info(res_json["data"]["status"])
  99. status = res_json["data"]["status"]
  100. # 检查是否为错误状态
  101. error_statuses = {"failed", "requires_action", "canceled"}
  102. if status in error_statuses:
  103. error_message = res_json["data"]["last_error"]
  104. logger.error(f"对话错误,状态:{status},错误信息:{error_message}")
  105. coze_response = f"对话错误,状态:{status},错误信息:{error_message}"
  106. url = ""
  107. return coze_response,company_name,url
  108. # 如果状态为completed,则获取消息
  109. if status == "completed":
  110. res_message = requests.get(f"https://api.coze.cn/v3/chat/message/list?chat_id={chat_id}&conversation_id={conversation_id}", headers=req_head)
  111. aa = res_message.json()
  112. # 假设res_message是已经获取到的响应对象
  113. data = res_message.json()['data']
  114. # 使用列表推导式找到所有type为'answer'的记录,然后取最后一个
  115. last_answer_record = next((record for record in reversed(data) if record['type'] == 'answer'), None)
  116. # 如果找到了符合条件的记录,则处理content
  117. if last_answer_record:
  118. report_link = last_answer_record['content'].replace(" ", "")
  119. url_start_index = report_link.find('http')
  120. url = report_link[url_start_index:]
  121. # coze_response = f'{company_name}的报告已生成:<a href="{url}" target="_blank">报告下载链接</a>'
  122. # coze_response = f'<a href="{url}" target="_blank">{company_name}报告下载链接</a>'
  123. # coze_response = f'<a href="{url}" target="_blank">{company_name}报告下载链接</a> - 原始链接:<a href="{url}" target="_blank">{url}</a>'
  124. coze_response = f'[{company_name}报告下载链接]({url}) - 原始链接:[{url}]({url})'
  125. else:
  126. # 如果没有找到符合条件的记录,则输出提示信息
  127. coze_response = f"{company_name}报告未生成"
  128. url = ""
  129. # report_link = res_message.json()['data'][-2]['content'].replace(" ", "") # v3 删除图片url中的空格
  130. # coze_response = coze_response['data'][1]['content'].replace(" ", "") # v3 删除图片url中的空格
  131. # 提取URL
  132. # 创建HTML链接
  133. return coze_response,company_name,url
  134. time.sleep(1)
  135. def qiwei_get():
  136. res = requests.get(f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}")
  137. qw_access_token = res.json()["access_token"]
  138. return qw_access_token
  139. def qiwei_post_text(username: str, answer: str,agentid:str):
  140. req_data = {
  141. "touser": username,
  142. "toparty": "",
  143. "totag": "",
  144. "msgtype": "text",
  145. "agentid": agentid,
  146. "text": {"content": answer},
  147. "image": {
  148. "media_id": "MEDIA_ID"
  149. },
  150. "safe": 0,
  151. "enable_id_trans": 0,
  152. "enable_duplicate_check": 0,
  153. "duplicate_check_interval": 1800
  154. }
  155. res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data)
  156. # print(res.json())
  157. logger.info(res.json())
  158. #return res.json()
  159. def qiwei_post_card(username: str, answer: str,agentid:str,company_name:str,url:str):
  160. req_data ={
  161. "touser": username,
  162. "toparty": "",
  163. "totag": "",
  164. "msgtype": "textcard",
  165. "agentid": agentid,
  166. "textcard": {
  167. "title": f"报告已生成(请在十分钟内点击下载)",
  168. "description": f"<div class=\"gray\">{company_name}报告下载链接</div> ",
  169. "url": url,
  170. "btntxt": ""
  171. },
  172. "enable_id_trans": 0,
  173. "enable_duplicate_check": 0,
  174. "duplicate_check_interval": 1800
  175. }
  176. res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data)
  177. # print(res.json())
  178. logger.info(res.json())
  179. #return res.json()
  180. #问题传入字节服务器进行回答后发送给企业微信,行内服务器只进行接收然后发给字节,防止网络延迟
  181. def post_consumer_api(user_query, decrypt_data, request_id,company_name):
  182. data = {
  183. "user_query": user_query,
  184. "decrypt_data": decrypt_data,
  185. "request_id": request_id,
  186. "company_name":company_name,
  187. }
  188. request_id_context.set(request_id)
  189. url = "https://101.126.81.2:18093/consumer"
  190. try:
  191. logger.info(f"post_consumer_api 被执行{user_query}")
  192. t1 = threading.Thread(target=requests.post, kwargs={"url": url, "json": data, "verify": False})
  193. t1.start()
  194. logger.info(f"post_consumer_api 请求成功: {t1}")
  195. except requests.exceptions.RequestException as e:
  196. logger.error(f"post_consumer_api 请求失败: {e}")
  197. @app.post("/consumer")
  198. def consumer(
  199. request_id: str = Body(...),
  200. user_query: str = Body(...),
  201. decrypt_data: dict = Body(...),
  202. company_name: str = Body(...),
  203. ):
  204. # print(f"请求:{user_query}")
  205. # body = await request.body()
  206. # body = body.decode()
  207. # body = json.loads(body)
  208. # request_id = body["request_id"]
  209. request_id_context.set(request_id)
  210. # user_query = body["user_query"]
  211. # company_name = body["company_name"]
  212. # decrypt_data = body["decrypt_data"]
  213. # bot_id = body["bot_id"]
  214. username = decrypt_data.get('FromUserName', '')
  215. agentid = decrypt_data.get('AgentID', '')
  216. qiwei_post_text(username, "正在加载,请稍后,预计2分钟...", agentid)
  217. logger.info("正在加载,请稍后,预计2分钟...")
  218. logger.info(f"consumer 请求:{user_query}")
  219. # 返回coze结果
  220. coze_response,company_name,url = call_llm(user_query=user_query,bot_id=bot_id,coze_access_token = coze_access_token,company_name= company_name)
  221. # answer = coze_response['messages'][1]['content']#v2
  222. # answer = coze_response['data'][1]['content'].replace(" ","") #v3 删除图片url中的空格
  223. # print(coze_response)
  224. answer = coze_response
  225. # ##处理图片链接
  226. # image_counter = 1
  227. # # 定义一个替换函数,用于在替换时添加序号
  228. # def replace_with_counter(match):
  229. # nonlocal image_counter
  230. # alt_text = match.group(1) or f"示例图片{image_counter}"
  231. # url = match.group(2)
  232. # replacement = f'<a href="{url}">{alt_text}</a>'
  233. # image_counter += 1
  234. # return replacement
  235. # # 将Markdown格式的图片链接转换为HTML格式的文字链接,并添加序号
  236. # answer = re.sub(r'!\[(.*?)\]\((https?://[^)]+)\)', replace_with_counter, coze_response)
  237. # print(f"结果:{answer}")
  238. logger.info(f"结果:{answer}")
  239. # 主动发结果给qiwei
  240. qiwei_post_card(username, answer, agentid,company_name,url)
  241. @app.get("/ok")
  242. async def ok():
  243. return "ok"
  244. @app.get("/bot")
  245. async def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str):
  246. ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
  247. if ret == 0:
  248. return Response(content=sEchoStr.decode('utf-8'))
  249. else:
  250. # print(sEchoStr)
  251. logger.info(sEchoStr)
  252. # 创建一个字典来存储用户的bot_id状态
  253. user_bot_id_mapping = {}
  254. @app.post("/bot")
  255. async def recv(msg_signature: str, timestamp: str, nonce: str, request: Request, background_tasks: BackgroundTasks):
  256. #start_time = time.time()
  257. body = await request.body()
  258. request_id = str(uuid.uuid4())
  259. request_id_context.set(request_id)
  260. ret, sMsg = wxcpt.DecryptMsg(body.decode('utf-8'), msg_signature, timestamp, nonce)
  261. decrypt_data = {}
  262. for node in list(fromstring(sMsg.decode('utf-8'))):
  263. decrypt_data[node.tag] = node.text
  264. # 获取用户发送的消息内容
  265. company_name = decrypt_data.get('Content', '')
  266. logger.info(f"start user_query: {company_name}")
  267. logger.info(f"start request_id: {request_id}")
  268. user_query = f"""query:{company_name}
  269. request_id: {request_id}"""
  270. # 处理其他类型的消息
  271. logger.info(f"start prompt: {user_query}")
  272. # background_tasks.add_task(post_consumer_api, user_query, decrypt_data, request_id,company_name)
  273. post_consumer_api(user_query, decrypt_data, request_id, company_name)
  274. return Response(content="")
  275. if __name__ == "__main__":
  276. # coze_response = call_llm(prompt="房快贷是什么",bot_id=bot_id,coze_access_token = coze_access_token)
  277. # print(coze_response)
  278. try:
  279. port = sys.argv[1]
  280. int(port)
  281. except:
  282. port = 18067
  283. request_id_context.set("app start")
  284. logger.info(f'{port=}')
  285. print(f'{port=}')
  286. uvicorn.run("coze_znjd_api:app", port=int(port), host='0.0.0.0', reload=False,ssl_keyfile="./key.pem", ssl_certfile="./cert.pem")