1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 |
- #!/usr/bin/env python3
- # coding=utf-8
- '''
- @Author: YTX (Tel:15610573978)
- @Date: 2022-11-15 16:30:46
- @LastEditors: YTX
- @LastEditTime: 2023-11-07 12:26:09
- '''
- import sys
- import time
- import json
- import requests
- from dingtalkchatbot.chatbot import DingtalkChatbot
- if len(sys.argv) < 2:
- print("usage: python3 msg.py https://oapi.dingtalk.com/robo 通知1 通知2 ... ...")
- webhook = sys.argv[1]
- msg = ' '.join(i for i in sys.argv[2:])
- msg = msg + '\n' if msg == '' else '\n' + msg + '\n'
- postData = {
- "object_kind": "push",
- "ref": "refs/heads/master",
- "user_name": "真正的内容",
- "project": {},
- "commits": [],
- "repository": {
- "name": "test",
- }
- }
- class HeadersUtils():
- headers = {"Content-Type": "application/json; charset=UTF-8"}
- def retrySendGitLabMsg(inputStr):
- postData["user_name"] = inputStr + "\n"
- try:
- resRetry = requests.post(webhook, data=json.dumps(postData), timeout=(15, 15), headers=HeadersUtils.headers)
- print(resRetry.text)
- except Exception as error:
- print("DingUtils.sendGitLabMsg失败:", error)
- xiaoding = DingtalkChatbot(webhook)
- try:
- res = xiaoding.send_text(msg='# hello 信息通知: ' + msg + str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))))
- if res['errcode'] != 0:
- # LogUtils.error("钉钉通知失败:", res, self.json())
- # 使用gitlab方式重试一次
- retrySendGitLabMsg(msg)
- except Exception as e:
- print("钉钉机器人发送失败:", e)
|