msg.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. #!/usr/bin/env python3
  2. # coding=utf-8
  3. '''
  4. @Author: YTX (Tel:15610573978)
  5. @Date: 2022-11-15 16:30:46
  6. @LastEditors: YTX
  7. @LastEditTime: 2023-11-07 12:26:09
  8. '''
  9. import sys
  10. import time
  11. import json
  12. import requests
  13. from dingtalkchatbot.chatbot import DingtalkChatbot
  14. if len(sys.argv) < 2:
  15. print("usage: python3 msg.py https://oapi.dingtalk.com/robo 通知1 通知2 ... ...")
  16. webhook = sys.argv[1]
  17. msg = ' '.join(i for i in sys.argv[2:])
  18. msg = msg + '\n' if msg == '' else '\n' + msg + '\n'
  19. postData = {
  20. "object_kind": "push",
  21. "ref": "refs/heads/master",
  22. "user_name": "真正的内容",
  23. "project": {},
  24. "commits": [],
  25. "repository": {
  26. "name": "test",
  27. }
  28. }
  29. class HeadersUtils():
  30. headers = {"Content-Type": "application/json; charset=UTF-8"}
  31. def retrySendGitLabMsg(inputStr):
  32. postData["user_name"] = inputStr + "\n"
  33. try:
  34. resRetry = requests.post(webhook, data=json.dumps(postData), timeout=(15, 15), headers=HeadersUtils.headers)
  35. print(resRetry.text)
  36. except Exception as error:
  37. print("DingUtils.sendGitLabMsg失败:", error)
  38. xiaoding = DingtalkChatbot(webhook)
  39. try:
  40. res = xiaoding.send_text(msg='# hello 信息通知: ' + msg + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))))
  41. if res['errcode'] != 0:
  42. # LogUtils.error("钉钉通知失败:", res, self.json())
  43. # 使用gitlab方式重试一次
  44. retrySendGitLabMsg(msg)
  45. except Exception as e:
  46. print("钉钉机器人发送失败:", e)