python 企业微信告警
目标:
为了实现服务器性能监控指标告警,通过alertmanager触发告警后发送到webhook,然后通过python脚本实现数据清洗,并把告警内容发送给指定的企业微信用户。
由于频繁获取企业微信api可能会管理员拉入小黑屋,所有需要把获取access_token缓存到redis上,然后取数据在redis获取即可。
#!/usr/bin/env Python# -*- coding:utf-8 -*-import jsonimport timeimport requestsimport osimport redisfrom flask import request, Flask class qywx_obj: def __init__(self): self.CORPID = 'xxxxxxxxxxxxx' # 企业的corpid self.CORPSECRET_SEND_MSG = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' # 应用的secret self.access_token = self.get_access_token(self.CORPSECRET_SEND_MSG) self.AGENTID = 1000002 self.user_info = self.get_user_info() def get_access_token(self, CORPSECRET): url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken' values = { 'corpid': self.CORPID, 'corpsecret': CORPSECRET, } req = requests.post(url, params=values) ACCESS_TOKEN = json.loads(req.text)["access_token"] return ACCESS_TOKEN def get_user_info(self): GET_USER_URL = 'https://qyapi.weixin.qq.com/cgi-bin/user/list' CORPSECRET_GET_INFO = 'xxxxxxxxxxxxxxx' # 联系人的secret 用来获取部门的成员信息 access_token = self.get_access_token(CORPSECRET_GET_INFO) values = { 'access_token': access_token, 'department_id': 1, 'fetch_child': 1, } USER_INFO = json.loads(requests.post(GET_USER_URL, params=values).text)["userlist"] return USER_INFO def send_data(self, TOUSER, message, access_token): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' self.access_token send_values = { "touser": TOUSER, "msgtype": "text", "agentid": 1000002, "text": { "content": message }, "safe": "0" } send_msges=(bytes(json.dumps(send_values), 'utf-8')) respone = requests.post(send_url, send_msges) respone = respone.json() return respone["errmsg"]class redis_obj: def __init__(self): self.host = "172.16.40.24" self.port = 6379 self.r = self.get_redis_obj() def get_redis_obj(self): pool=redis.ConnectionPool(host=self.host,port=self.port,db=0) r = redis.StrictRedis(connection_pool=pool) return r def redis_cache(self, key, data): self.r.set(key, data) self.r.expire(key, 3600) def get_access_from_redis(self, key): if self.r.get(key): return self.r.get(key) def judge_redis_keys(self, key): if self.r.exists(key) == 1: return True else: return False def GetData(): PostData = request.get_data().decode() Data = json.loads(PostData) JsonData = json.dumps(Data, ensure_ascii=False, indent=4) return JsonDataapp = Flask(__name__)app.config['JSON_AS_ASCII'] = False @app.route('/webhook/', methods=['POST'])def IssueCreate(): qywx = qywx_obj() rds = redis_obj() if not rds.judge_redis_keys("key_for_msg"): rds.redis_cache("key_for_msg",qywx.access_token) if not rds.judge_redis_keys("key_for_info"): USER_INFO = qywx.user_info rds.redis_cache("key_for_info",json.dumps(USER_INFO)) access_token_for_sendmsg = rds.get_access_from_redis("key_for_msg") USER_INFO = json.loads(rds.get_access_from_redis("key_for_info").decode()) description = json.loads(GetData()) alert_info = description['alerts'] for n in range(len(alert_info)): hostname = alert_info[n]['labels']['vm_name'] try: IP = "172.16." hostname.split("-")[2] except: continue level = alert_info[n]['labels']['severity'] Server_supervisor = hostname.split("-")[1] alertname = alert_info[n]['labels']['alertname'] annotations = alert_info[n]['annotations']['summary'] content = ''' 负责人:%s 服务器地址:%s 告警信息:%s 严重程度:%s '''%(Server_supervisor, IP, annotations, level) usermail = "%s@qq.com"%Server_supervisor for index in range(len(USER_INFO)): if USER_INFO[index]["email"] == usermail: TOUSER = USER_INFO[index]["userid"] print(TOUSER, IP) result = qywx.send_data(TOUSER, content, access_token_for_sendmsg) print(result) break return "OK", 200 if __name__ == '__main__': app.run(debug = True, host = '172.16.40.24', port = 6688)
赞 (0)