coze_bot_api.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. -----------------File Info-----------------------
  5. Name: web.py
  6. Description: web api support
  7. Author: GentleCP
  8. Email: me@gentlecp.com
  9. Create Date: 2021/6/19
  10. -----------------End-----------------------------
  11. """
  12. import time
  13. from django.db.models.fields import return_None
  14. from fastapi import FastAPI, Response, Request, BackgroundTasks
  15. from WXBizMsgCrypt3 import WXBizMsgCrypt
  16. from xml.etree.ElementTree import fromstring
  17. import uvicorn
  18. import requests
  19. import json
  20. # 加载配置文件
  21. with open('config.json', 'r') as f:
  22. config = json.load(f)
  23. # 从配置文件中提取参数
  24. token = config['token']
  25. aeskey = config['aeskey']
  26. corpid = config['corpid']
  27. corpsecret = config['corpsecret']
  28. coze_access_token = config['coze_access_token']
  29. bot_id = config['bot_id']
  30. port = config['port']
  31. # token = "EcSp"#企业微信应用api信息
  32. # aeskey = "OTZoY8N67kOnGosEpS3jw4Rsjea0Gu6D7X4IWxoYKtY"#企业微信应用api信息
  33. # corpid = "ww5541cfeea51e3188"#企业id
  34. # corpsecret = "SbyG25s1LsMsW0nAMiaNprrQIHYrWKQP4f2mNLLDnwE"##api成功后的secret
  35. # coze_access_token = "pat_HNBYQOWE5h4r1tzXi8S2PuY4ddoVRH3DpTbE3NsYBjtcWHTYw5ffrVmKPh26hSLW"#豆包access_token
  36. # bot_id="7397619068440182793"#豆包机器人id
  37. # port = 18090#服务器端口
  38. wxcpt = WXBizMsgCrypt(token, aeskey, corpid)
  39. app = FastAPI()
  40. # def call_llm(prompt: str, bot_id: str,coze_access_token:str):
  41. # req_head = {
  42. # "Authorization":f"Bearer {coze_access_token}",
  43. # "Content-Type": "application/json",
  44. # }
  45. # req_data = {
  46. # "conversation_id": "123",
  47. # "bot_id": bot_id,
  48. # "user": "test",
  49. # "query": prompt,
  50. # "stream": False
  51. # }
  52. # res = requests.post("https://api.coze.cn/open_api/v2/chat", headers=req_head, json=req_data)
  53. # res.raise_for_status() # 检查响应状态码是否为200
  54. # return res.json()
  55. def call_llm(prompt: str, bot_id: str,coze_access_token:str):
  56. req_head = {
  57. "Authorization":f"Bearer {coze_access_token}",
  58. "Content-Type": "application/json",
  59. }
  60. req_data ={
  61. "bot_id": bot_id,
  62. "user_id": "123456789",
  63. "stream": False,
  64. "auto_save_history": True,
  65. "additional_messages": [
  66. {
  67. "role": "user",
  68. "content": prompt,
  69. "content_type": "text"
  70. }
  71. ]
  72. }
  73. res_create = requests.post(" https://api.coze.cn/v1/conversation/create", headers=req_head)
  74. coversition_id = res_create.json()["data"]["id"]
  75. res_chat = requests.post(f" https://api.coze.cn/v3/chat?conversation_id={coversition_id}", headers=req_head,json=req_data)
  76. chat_id = res_chat.json()["data"]["id"]
  77. while True:
  78. res_retrieve = requests.get(f" https://api.coze.cn/v3/chat/retrieve?chat_id={chat_id}&conversation_id={coversition_id}", headers=req_head)
  79. print(res_retrieve.json()["data"]["status"])
  80. status = res_retrieve.json()["data"]["status"]
  81. if status == "completed":
  82. res_message = requests.get(f" https://api.coze.cn/v3/chat/message/list?chat_id={chat_id}&conversation_id={coversition_id}", headers=req_head)
  83. # print(res_message.json())
  84. return res_message.json()
  85. time.sleep(1)
  86. # res.raise_for_status() # 检查响应状态码是否为200
  87. # return res.json()
  88. def qiwei_get():
  89. res = requests.get(f"https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={corpsecret}")
  90. qw_access_token = res.json()["access_token"]
  91. return qw_access_token
  92. def qiwei_post(username: str, answer: str,agentid:str):
  93. req_data = {
  94. "touser": username,
  95. "toparty": "",
  96. "totag": "",
  97. "msgtype": "text",
  98. "agentid": agentid,
  99. "text": {"content": answer},
  100. "safe": 0,
  101. "enable_id_trans": 0,
  102. "enable_duplicate_check": 0,
  103. "duplicate_check_interval": 1800
  104. }
  105. res = requests.post(f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={qiwei_get()}", json=req_data)
  106. print(res.json())
  107. #return res.json()
  108. def consumer(user_query,decrypt_data):
  109. print(f"请求:{user_query}")
  110. username = decrypt_data.get('FromUserName', '')
  111. agentid = decrypt_data.get('AgentID', '')
  112. # 返回coze结果
  113. coze_response = call_llm(prompt=user_query,bot_id=bot_id,coze_access_token = coze_access_token)
  114. answer = coze_response['data'][1]['content']
  115. print(f"结果:{answer}")
  116. # 主动发结果给qiwei
  117. qiwei_post(username, answer, agentid)
  118. @app.get("/bot")
  119. async def verify(msg_signature: str, timestamp: str, nonce: str, echostr: str):
  120. ret, sEchoStr = wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
  121. if ret == 0:
  122. return Response(content=sEchoStr.decode('utf-8'))
  123. else:
  124. print(sEchoStr)
  125. @app.post("/bot")
  126. async def recv(msg_signature: str, timestamp: str, nonce: str, request: Request, background_tasks: BackgroundTasks):
  127. #start_time = time.time()
  128. body = await request.body()
  129. ret, sMsg = wxcpt.DecryptMsg(body.decode('utf-8'), msg_signature, timestamp, nonce)
  130. decrypt_data = {}
  131. for node in list(fromstring(sMsg.decode('utf-8'))):
  132. decrypt_data[node.tag] = node.text
  133. user_query = decrypt_data.get('Content', '')
  134. background_tasks.add_task(consumer, user_query, decrypt_data)
  135. return Response(content="")
  136. if __name__ == "__main__":
  137. uvicorn.run("coze_bot_api:app", port=port, host='0.0.0.0', reload=False,ssl_keyfile="./key.pem", ssl_certfile="./cert.pem")