coze_bot_api_test.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. -----------------File Info-----------------------
  5. Name: web.py
  6. Description: web api support
  7. Author: GentleCP
  8. Email: me@gentlecp.com
  9. Create Date: 2021/6/19
  10. -----------------End-----------------------------
  11. """
  12. import asyncio
  13. import json
  14. import re
  15. import sys
  16. import time
  17. import uuid
  18. from concurrent.futures import ThreadPoolExecutor
  19. from xml.etree.ElementTree import fromstring
  20. import requests
  21. import uvicorn
  22. from anyio import CapacityLimiter
  23. from anyio.lowlevel import RunVar
  24. from fastapi import FastAPI, Response, Request, Body, BackgroundTasks
  25. from WXBizMsgCrypt3 import WXBizMsgCrypt
  26. from commom import get_logger, request_id_context
  27. timeout = 30
  28. logger = get_logger()
  29. # 加载配置文件
  30. with open('config.json', 'r') as f:
  31. config = json.load(f)
  32. # 从配置文件中提取参数
  33. token = config['token']
  34. aeskey = config['aeskey']
  35. corpid = config['corpid']
  36. corpsecret = config['corpsecret']
  37. coze_access_token = config['coze_access_token']
  38. # bot_id = config['bot_id']
  39. # port = config['port']
  40. # token = "EcSp"#企业微信应用api信息
  41. # aeskey = "OTZoY8N67kOnGosEpS3jw4Rsjea0Gu6D7X4IWxoYKtY"#企业微信应用api信息
  42. # corpid = "ww5541cfeea51e3188"#企业id
  43. # corpsecret = "SbyG25s1LsMsW0nAMiaNprrQIHYrWKQP4f2mNLLDnwE"##api成功后的secret
  44. # coze_access_token = "pat_HNBYQOWE5h4r1tzXi8S2PuY4ddoVRH3DpTbE3NsYBjtcWHTYw5ffrVmKPh26hSLW"#豆包access_token
  45. # bot_id="7397619068440182793"#豆包机器人id
  46. # port = 18090#服务器端口
  47. wxcpt = WXBizMsgCrypt(token, aeskey, corpid)
  48. app = FastAPI()
  49. executor = ThreadPoolExecutor(max_workers=200)
  50. @app.on_event("startup")
  51. def startup():
  52. print("start")
  53. RunVar("_default_thread_limiter").set(CapacityLimiter(210))
  54. # def call_llm(prompt: str, bot_id: str,coze_access_token:str):
  55. # req_head = {
  56. # "Authorization":f"Bearer {coze_access_token}",
  57. # "Content-Type": "application/json",
  58. # }
  59. # req_data = {
  60. # "conversation_id": "123",
  61. # "bot_id": bot_id,
  62. # "user": "test",
  63. # "query": prompt,
  64. # "stream": False
  65. # }
  66. # res = requests.post("https://api.coze.cn/open_api/v2/chat", headers=req_head, json=req_data)
  67. # res.raise_for_status() # 检查响应状态码是否为200
  68. # return res.json()
  69. def call_llm(prompt: str, bot_id: str, coze_access_token: str):
  70. req_head = {
  71. "Authorization": f"Bearer {coze_access_token}",
  72. "Content-Type": "application/json",
  73. }
  74. req_data = {
  75. "bot_id": bot_id,
  76. "user_id": "123456789",
  77. "stream": False,
  78. "auto_save_history": True,
  79. "additional_messages": [
  80. {
  81. "role": "user",
  82. "content": prompt,
  83. "content_type": "object_string"
  84. }
  85. ]
  86. }
  87. res_create = requests.post(" https://api.coze.cn/v1/conversation/create", headers=req_head)
  88. conversation_id = res_create.json()["data"]["id"]
  89. res_chat = requests.post(f" https://api.coze.cn/v3/chat?conversation_id={conversation_id}", headers=req_head,
  90. json=req_data)
  91. chat_id = res_chat.json()["data"]["id"]
  92. while True:
  93. # logger.info(f"{prompt=}")
  94. res_retrieve = requests.get(
  95. f" https://api.coze.cn/v3/chat/retrieve?chat_id={chat_id}&conversation_id={conversation_id}",
  96. headers=req_head)
  97. res_json = res_retrieve.json()
  98. # 首先判断网络状态是否为200
  99. if res_retrieve.status_code != 200:
  100. # logger.error(f"网络状态码失败,错误码:{res_retrieve.status_code}")
  101. coze_response = f"网络状态码失败,错误码:{res_retrieve.status_code}"
  102. return coze_response
  103. # 判断状态码是否为0
  104. if res_json["code"] != 0:
  105. # logger.error(f"API调用失败,错误码:{res_json['code']}")
  106. coze_response = f"API调用失败,错误码:{res_json['code']}"
  107. return coze_response
  108. # 打印并记录状态
  109. # logger.info(res_json["data"]["status"])
  110. status = res_json["data"]["status"]
  111. # 检查是否为错误状态
  112. error_statuses = {"failed", "requires_action", "canceled"}
  113. if status in error_statuses:
  114. error_message = res_json["data"]["last_error"]
  115. # logger.error(f"对话错误,状态:{status},错误信息:{error_message}")
  116. coze_response = f"对话错误,状态:{status},错误信息:{error_message}"
  117. return coze_response
  118. # 如果状态为completed,则获取消息
  119. if status == "completed":
  120. res_message = requests.get(
  121. f"https://api.coze.cn/v3/chat/message/list?chat_id={chat_id}&conversation_id={conversation_id}",
  122. headers=req_head)
  123. # 假设res_message是已经获取到的响应对象
  124. data = res_message.json()['data']
  125. # 使用列表推导式找到所有type为'answer'的记录,然后取最后一个
  126. last_answer_record = next((record for record in reversed(data) if record['type'] == 'answer'), None)
  127. coze_response = last_answer_record['content'].replace(" ", "") # v3 删除图片url中的空格
  128. # coze_response = coze_response['data'][1]['content'].replace(" ", "") # v3 删除图片url中的空格
  129. return coze_response
  130. time.sleep(1)
  131. def qiwei_get():
  132. res = requests.get(f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}")
  133. qw_access_token = res.json()["access_token"]
  134. return qw_access_token
  135. def qiwei_post(username: str, answer: str, agentid: str):
  136. req_data = {
  137. "touser": username,
  138. "toparty": "",
  139. "totag": "",
  140. "msgtype": "text",
  141. "agentid": agentid,
  142. "text": {"content": answer},
  143. "image": {
  144. "media_id": "MEDIA_ID"
  145. },
  146. "safe": 0,
  147. "enable_id_trans": 0,
  148. "enable_duplicate_check": 0,
  149. "duplicate_check_interval": 1800
  150. }
  151. res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data)
  152. # print(res.json())
  153. logger.info(res.json())
  154. # return res.json()
  155. # 问题传入字节服务器进行回答后发送给企业微信,行内服务器只进行接收然后发给字节,防止网络延迟
  156. async def post_consumer_api(user_query, decrypt_data, request_id):
  157. data = {
  158. "user_query": user_query,
  159. "decrypt_data": decrypt_data,
  160. "request_id": request_id,
  161. }
  162. request_id_context.set(request_id)
  163. url = "https://101.126.81.2:18090/consumer"
  164. try:
  165. logger.info(f"post_consumer_api {user_query=}")
  166. def consumer(data):
  167. re = requests.post(url=url, json=data, verify=False)
  168. return re.text
  169. loop = asyncio.get_event_loop()
  170. result = await loop.run_in_executor(executor, consumer, data)
  171. logger.info(f"post_consumer_api {result=}")
  172. return result
  173. # t1 = threading.Thread(target=requests.post, kwargs={"url": url, "json": data, "verify": False})
  174. # t1.start()
  175. # response = requests.post(url, json=data, verify=False) # 忽略SSL证书验证
  176. # response.raise_for_status() # 检查响应状态码是否为200
  177. # logger.info(f"post_consumer_api 请求成功: {response.json()}")
  178. except requests.exceptions.RequestException as e:
  179. logger.error(f"post_consumer_api 请求失败: {e}")
  180. return "error"
  181. # 创建一个字典来存储用户的bot_id状态
  182. user_bot_id_mapping = {}
  183. # 新增:存储用户是否已收到欢迎消息的状态
  184. user_welcome_status = {}
  185. @app.post("/consumer")
  186. async def consumer(
  187. request_id: str = Body(...),
  188. user_query: str = Body(...),
  189. decrypt_data: dict = Body(...),
  190. ):
  191. # print(f"请求:{user_query}")
  192. # {
  193. # "request_id":"",
  194. # "user_query": "",
  195. # "bot_id": "",
  196. # "decrypt_data": {'ToUserName': 'ww5541cfeea51e3188', 'FromUserName': 'ZhuSanCheng', 'CreateTime': '1739241434', 'MsgType': 'text', 'Content': '产品介绍', 'MsgId': '7469985082764423235', 'AgentID': '1000002'}
  197. # }
  198. # body = body.decode()
  199. # body = json.loads(body)
  200. # request_id = body["request_id"]
  201. request_id_context.set(request_id)
  202. # user_query = body["user_query"] if body["user_query"] else "回答图片中的问题"
  203. # user_query = "回答图片中的问题"
  204. # decrypt_data = body["decrypt_data"]
  205. # bot_id = body["bot_id"]
  206. # decrypt_data = json.loads(decrypt_data)
  207. username = decrypt_data.get('FromUserName', '')
  208. agentid = decrypt_data.get('AgentID', '')
  209. msgtype = decrypt_data.get('MsgType', '')
  210. picurl = decrypt_data.get('PicUrl', '')
  211. event = decrypt_data.get('Event')
  212. msg_type = decrypt_data.get('MsgType')
  213. event_key = decrypt_data.get('EventKey')
  214. print(f"event_key: {event_key}")
  215. # 修改:仅在用户首次进入时发送欢迎消息
  216. if msg_type == 'event' and event == 'enter_agent':
  217. if not user_welcome_status.get(username, False): # 检查是否首次进入
  218. welcome_message = "Hi,我是小微AI助手~你可以在屏幕底部“产品选择”菜单栏选择想咨询的产品,我会随时为你解答问题~"
  219. qiwei_post(username, welcome_message, agentid)
  220. user_welcome_status[username] = True # 标记为已发送
  221. return Response(content="")
  222. # 根据用户发送的消息内容切换bot_id
  223. if msg_type == 'event' and event == 'click' and event_key == '#sendmsg#_0_0#7599826213209000':
  224. bot_id = "7456977536891846697" # 当用户发送渝快振兴贷时使用的bot_id
  225. change_message = "您好,已切换为渝快振兴贷产品助手,请输入问题。"
  226. qiwei_post(username, change_message, agentid)
  227. user_bot_id_mapping[username] = bot_id # 更新用户的bot_id状态
  228. return Response(content="")
  229. elif msg_type == 'event' and event == 'click' and event_key == '#sendmsg#_0_1#7599826213209001':
  230. bot_id = "7445101065005154313" # 当用户发送房快贷时使用的bot_id
  231. change_message = "您好,已切换为房快贷产品助手,请输入问题。"
  232. qiwei_post(username, change_message, agentid)
  233. user_bot_id_mapping[username] = bot_id # 更新用户的bot_id状态
  234. return Response(content="")
  235. else:
  236. # 如果用户之前已经选择了bot_id,则使用之前的bot_id
  237. bot_id = user_bot_id_mapping.get(username, "7456977536891846697") # 默认bot_id
  238. qiwei_post(username, "我正在思考,请稍等...", agentid)
  239. logger.info("我正在思考,请稍等...")
  240. logger.info(f"consumer 请求:{user_query}")
  241. user_query = user_query if user_query else "回答图片中的问题"
  242. multimodal_content = [
  243. {"type": "text", "text": user_query},
  244. {"type": msgtype, "file_url": picurl},
  245. # {"type": "file", "file_id": "fileid2"},
  246. # {"type": "file", "file_url": "fileurl1"}
  247. ]
  248. user_query = json.dumps(multimodal_content, ensure_ascii=False)
  249. # 返回coze结果
  250. loop = asyncio.get_event_loop()
  251. future = loop.run_in_executor(executor, call_llm, user_query, bot_id, coze_access_token)
  252. try:
  253. coze_response = await asyncio.wait_for(future, timeout)
  254. except asyncio.exceptions.TimeoutError as err:
  255. return Response(content="timeout")
  256. # coze_response = call_llm(prompt=user_query, bot_id=bot_id, coze_access_token=coze_access_token)
  257. # answer = coze_response['messages'][1]['content']#v2
  258. # answer = coze_response['data'][1]['content'].replace(" ","") #v3 删除图片url中的空格
  259. ##处理图片链接
  260. image_counter = 1
  261. # 定义一个替换函数,用于在替换时添加序号
  262. def replace_with_counter(match):
  263. nonlocal image_counter
  264. alt_text = match.group(1) or f"示例图片{image_counter}"
  265. url = match.group(2)
  266. replacement = f'<a href="{url}">{alt_text}</a>'
  267. image_counter += 1
  268. return replacement
  269. # 将Markdown格式的图片链接转换为HTML格式的文字链接,并添加序号
  270. answer = re.sub(r'!\[(.*?)\]\((https?://[^)]+)\)', replace_with_counter, coze_response)
  271. # print(f"结果:{answer}")
  272. logger.info(f"结果:{answer}")
  273. qiwei_post(username, answer, agentid)
  274. return Response(content=answer)
  275. # 主动发结果给qiwei
  276. # choice_answer = "若需要切换产品助手请输入产品名称(渝快振兴贷、房快贷),无需切换请忽略"
  277. # qiwei_post(username, choice_answer, agentid)
  278. @app.get("/ok")
  279. def ok():
  280. return "ok"
  281. @app.get("/bot")
  282. def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str):
  283. ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
  284. if ret == 0:
  285. return Response(content=sEchoStr.decode('utf-8'))
  286. else:
  287. # print(sEchoStr)
  288. logger.info(sEchoStr)
  289. #
  290. @app.post("/bot")
  291. async def recv(msg_signature: str, timestamp: str, nonce: str, request: Request, background_tasks: BackgroundTasks):
  292. # start_time = time.time()
  293. body = await request.body()
  294. request_id = str(uuid.uuid4())
  295. request_id_context.set(request_id)
  296. ret, sMsg = wxcpt.DecryptMsg(body.decode('utf-8'), msg_signature, timestamp, nonce)
  297. decrypt_data = {}
  298. for node in list(fromstring(sMsg.decode('utf-8'))):
  299. decrypt_data[node.tag] = node.text
  300. # print(body)
  301. # 获取用户发送的消息内容
  302. # decrypt_data = json.loads(body)
  303. user_query = decrypt_data.get('Content', '')
  304. # logger.info(f"start: {user_query}")
  305. # print(event_key)
  306. # 处理其他类型的消息
  307. logger.info(f"start: {user_query}")
  308. # background_tasks.add_task(post_consumer_api, user_query, decrypt_data, request_id, bot_id)
  309. result = await post_consumer_api(user_query, decrypt_data, request_id)
  310. # return Response(content="")
  311. return Response(content=result)
  312. # user_query = decrypt_data.get('Content', '')
  313. # logger.info(f"start: {user_query}")
  314. # background_tasks.add_task(post_consumer_api, user_query, decrypt_data, request_id)
  315. # data = {
  316. # "user_query":user_query,
  317. # "decrypt_data":decrypt_data
  318. # }
  319. # requests.post(
  320. # f"https://101.126.81.2:18066/consumer",
  321. # data=data)
  322. # return Response(content="")
  323. if __name__ == "__main__":
  324. # coze_response = call_llm(prompt="房快贷是什么",bot_id=bot_id,coze_access_token = coze_access_token)
  325. # print(coze_response)
  326. try:
  327. port = sys.argv[1]
  328. int(port)
  329. except:
  330. port = 18090
  331. request_id_context.set("app start")
  332. logger.info(f'{port=}')
  333. print(f'{port=}')
  334. uvicorn.run("coze_bot_api_test:app", port=int(port), host='0.0.0.0', reload=False, ssl_keyfile="./key.pem",
  335. ssl_certfile="./cert.pem")