Python 版 OneAlert API 集成调用源码

airflow
monitror
python
sourcecode
alert

#1

效果:

源码

# coding: UTF-8
# importing the requests library 
import requests

import json

# defining the api-endpoint 
API_ENDPOINT = "http://api.onealert.com/alert/api/event/"

# your API key here 
APPKEY = "xxxxxxxxxxxxxx" # 登录 www.onealert.com,进入配置--应用页面获取

# data to be sent to api 
payload = {
    'app':APPKEY,           # string, 必填,告警集成的应用KEY
    'eventType':'trigger',  # string, 必填,触发告警trigger,解决告警resolve
    'eventId':'123456',     # string, 必填,事件 ID ,告警压缩和关闭时用到
    'alarmContent':'Airfow alert: <TaskInstance: userprofile.usergroup_check 2018-09-20 10:00:00 [failed]>', # string, 可选
    'priority':1,           # int, 可选,告警级别;提醒 1,警告 2,严重 3
    'entityName':'Airflow',
    'details': {
        "details":"Try 2 out of 2 \
             Exception:\
             Bash command failed\
             Log: Link\
             Host:bi-airflow\
             Log file: /data1/logs/airflow/userprofile/usergroup_check/2018-09-20T10:00:00.log"
    },
    'contexts': [
        {
            "type": "link",
            "text": "generatorURL",
            "href": "http://www.baidu.com"
        },
        {
            "type": "link",
            "href": "http://wiki.onealert.com/api/event_api.html",
            "text": "查看帮助"
        },
        {
            "type": "image",
            "src": "http://s.oneapm.com/Soho.png"
        }
   ]
}

# sending post request and saving response as response object 
r = requests.post(url = API_ENDPOINT, data = json.dumps(payload))

# extracting response text 
pastebin_url = r.text 
print("The pastebin URL is:%s"%pastebin_url)