分类: python/ruby
2020-04-25 14:47:41
前言:
最近研究zabbix告警,网上看了帖子有各式各样姿势:电话语音告警,邮件告警,短信告警,微信公众号告警等等等..姿势五花八门,真是纠结。
电话语音告警,短信告警首先pass 前者花钱,后者通过设置139邮箱,就可以实现伪短信告警效果。
剩下邮件告警与微信公众号告警。邮件告警已经在部署的时候配置完毕,剩下这个微信公众号告警,查一下帖子,申请各种麻烦。那么有没有基于微信个人账号的告警呢?想到这个点,马上github一番。
12345
搜索到一些优秀的开源代码:
及封包:
经过一番思考,大致思路如下:
微信web保持活动状态,通过不断使用zabbix api抓取故障告警,入库后微信发送告警给相关人员。
123
一 需求实现具体思路
实现该需求是得有多个线程同时进行的
1
微信心跳,微信web版保持活动状态。
zabbix api ,不断请求zabbix告警,发现告警后,判断后入库。
使用数据库不断查询告警,如果发现符合条件告警则发送告警给相关人员。
二 部分代码及注释
第一部分:wechat
coding=utf-8
#伪装请求头
headers = {‘user-agent’: ‘mozilla/5.0 (macintosh; intel mac os x 10_11_2) applewebkit/537.36 (khtml, like gecko) chrome/44.0.2403.125 safari/537.36’}
myrequests = requests.session()
myrequests.headers.update(headers)
debug = false
#微信类
class wechat(object):
def init(self):
self.uuid = ‘’
self.base_uri = ‘’
self.push_uri = ‘’
self.redirect_uri = ‘’
self.baserequest = {}
self.skey = ‘’
self.wxuin = ‘’
self.wxsid = ‘’
self.skey = ‘’
self.deviceid = ‘e’ repr(random())[2:17] #随机生成15位机器码
self.pass_ticket = ‘’
self.memberlist =[]
self.contactlist = []
self.alarmfriends =[]
self.intervals = ‘’
self.xintiao = ‘’
#获取uuid
def get_uuid(self):
url = ''
params = {
'appid': 'wx782c26e4c19acffb',
'fun': 'new',
'lang': 'zh_cn',
'_': int(time.time()),
}
r = myrequests.get(url=url, params=params)
r.encoding = 'utf-8'
data = r.text
# data返回,code=200为状态.uuid="iztw06wnsg=="为uuid
# window.qrlogin.code = 200; window.qrlogin.uuid = "iztwo6wnsg==";
# 正则匹配:匹配出状态码 以及uuid
regx = r'window.qrlogin.code = (\d ); window.qrlogin.uuid = "(\s ?)"'
pm = re.search(regx, data)
code = pm.group(1)
if code == '200':
self.uuid = pm.group(2)
return true
return false
# windows下直接打开二维码图
def _openwinqrcodeimg(self):
url = '' self.uuid
params = {
't': 'webwx',
'_': int(time.time())
}
r = myrequests.get(url=url, params=params)
f = open(qrimagepath, 'wb')
f.write(r.content)
f.close()
time.sleep(1)
os.startfile(qrimagepath)
# linux下的二维码处理
def _printqr(self, mat):
for i in mat:
black = '\033[40m \033[0m'
white = '\033[47m \033[0m'
print (''.join([black if j else white for j in i]))
def _str2qr(self, str):
qr = qrcode.qrcode()
qr.border = 1
qr.add_data(str)
mat = qr.get_matrix()
self._printqr(mat) # qr.print_tty() or qr.print_ascii()
# 判断操作系统,选择打开二维码扫描方式
def genqrcode(self):
if sys.platform.startswith('win'):
self._openwinqrcodeimg()
else:
self._str2qr('' self.uuid)
#等待登陆
def waitforlogin(self, tip=1):
time.sleep(tip)
url = '%s&uuid=%s&_=%s' % (
tip, self.uuid, int(time.time()))
r = myrequests.get(url=url)
r.encoding = 'utf-8'
data = r.text
# data返回:
# window.code = 201;
#判断返回码
regx = r'window.code=(\d );'
pm = re.search(regx, data)
code = pm.group(1)
if code == '201': # 已扫描
print('[*]成功扫描,请在手机上点击确认以登录')
elif code == '200': # 已登录
print('[.]正在登录...')
regx = r'window.redirect_uri="(\s ?)";'
pm = re.search(regx, data)
self.redirect_uri = pm.group(1) '&fun=new'
base_uri = self.redirect_uri[:self.redirect_uri.rfind('/')]
# push_uri与base_uri对应关系(排名分先后)
services = [
('wx2.qq.com', 'webpush2.weixin.qq.com'),
('qq.com', 'webpush.weixin.qq.com'),
('web1.wechat.com', 'webpush1.wechat.com'),
('web2.wechat.com', 'webpush2.wechat.com'),
('wechat.com', 'webpush.wechat.com'),
('web1.wechatapp.com', 'webpush1.wechatapp.com'),
]
# self.push_uri = self.base_uri
self.push_uri = base_uri
for (searchurl, pushurl) in services:
if base_uri.find(searchurl) >= 0:
self.push_uri = 'https://%s/cgi-bin/mmwebwx-bin' % pushurl
self.base_uri = 'https://%s/cgi-bin/mmwebwx-bin' % searchurl
break
elif code == '408': # 超时
pass
# elif code == '400' or code == '500':
return code
#登陆
def login(self):
r = myrequests.get(url=self.redirect_uri)
r.encoding = 'utf-8'
data = r.text
# print (data)
# data返回
# < ret > 0 < / ret > < message > ok < / message >
# < skey >xxxx < skey >
# < wxsid > xxxx < / wxsid >
# < wxuin > xxxx < / wxuin >
# < pass_ticket > xxxx < / pass_ticket >
# < isgrayscale > 1 < / isgrayscale >
#解析xml文件
doc = xml.dom.minidom.parsestring(data)
root = doc.documentelement
for node in root.childnodes:
if node.nodename == 'skey':
self.skey = node.childnodes[0].data
elif node.nodename == 'wxsid':
self.wxsid = node.childnodes[0].data
elif node.nodename == 'wxuin':
self. wxuin = node.childnodes[0].data
elif node.nodename == 'pass_ticket':
self.pass_ticket = node.childnodes[0].data
# print('skey: %s, wxsid: %s, wxuin: %s, pass_ticket: %s' % (skey, wxsid,wxuin, pass_ticket))
if not all((self.skey, self.wxsid, self.wxuin, self.pass_ticket)):
return false
self.baserequest = {
'uin': int(self.wxuin),
'sid': self.wxsid,
'skey': self.skey,
'deviceid': self.deviceid,
}
# print (self.push_uri)
return true
def webwxinit(self):
url = ( self.base_uri '/webwxinit?pass_ticket=%s&skey=%s&r=%s' \
% (self.pass_ticket, self.skey, int(time.time())))
params = {'baserequest': self.baserequest}
headers = {'content-type': 'application/json; charset=utf-8'}
r = myrequests.post(url=url, data=json.dumps(params), headers=headers)
r.encoding = 'utf-8'
data = r.json()
self.synckey = data['synckey']
self.user = data['user']
if false:
f = open(os.path.join(os.getcwd(), 'webwxinit.json'), 'wb')
f.write(r.content)
f.close()
self.synckey = '|'.join([str(keyval['key']) '_' str(keyval['val']) for keyval in self.synckey['list']])
state = self.responsestate('webwxinit', data['baseresponse'])
return state
def webwxstatusnotify(self):
url = self.base_uri \
'/webwxstatusnotify?lang=zh_cn&pass_ticket=%s' % (self.pass_ticket)
params = {
'baserequest': self.baserequest,
"code": 3,
"fromusername": self.user['username'],
"tousername": self.user['username'],
"clientmsgid": int(time.time())
}
r = myrequests.post(url=url, params=json.dumps(params))
data = r.json()
state = self.responsestate('wexinstatusnotify',data['baseresponse'])
#return data['baseresponse']['ret'] == 0
return state
#获取好友列表
def webwxgetcontact(self):
url = (self.base_uri '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' % (\
self.pass_ticket, self.skey, int(time.time())))
headers = {'content-type': 'application/json; charset=utf-8'}
r = myrequests.post(url=url, headers=headers)
r.encoding = 'utf-8'
data = r.json()
if false:
f = open(os.path.join(os.getcwd(), 'webwxgetcontact.json'), 'wb')
f.write(r.content)
f.close()
self.memberlist = data['memberlist']
specialusers = ["newsapp", "fmessage", "filehelper", "weibo", "qqmail", "tmessage", "qmessage",\
"qqsync","floatbottle", "lbsapp", "shakeapp", "medianote", "qqfriend", "readerapp",\
"blogapp", "facebookapp", "masssendapp","meishiapp", "feedsapp", "voip",\
"blogappweixin", "weixin", "brandsessionholder", "weixinreminder",\
"wxid_novlwrv3lqwv11", "gh_22b87fa7cb3c", "officialaccounts",\
"notification_messages", "wxitil", "userexperience_alarm"]
#将列表中特殊账号删除
for i in range(len(self.memberlist) - 1, -1, -1):
member = self.memberlist[i]
if member['verifyflag'] & 8 != 0: # 公众号/服务号
self.memberlist.remove(member)
elif member['username'] in specialusers: # 特殊账号
self.memberlist.remove(member)
elif member['username'].find('@@') != -1: # 群聊
self.memberlist.remove(member)
elif member['username'] == self.user: # 自己
self.memberlist.remove(member)
self.contactlist = self.memberlist
return true
#发送信息
def webwxsendmsg(self, word, to='filehelper'):
url = self.base_uri \
'/webwxsendmsg?pass_ticket=%s' % (self.pass_ticket)
clientmsgid = str(int(time.time() * 1000)) \
str(random())[:5].replace('.', '')
params = {
'baserequest':{
"uin": int(self.wxuin),
"sid": self.wxsid,
"skey":self.skey,
"deviceid": self.deviceid,
},
'scene': 0,
'msg':{
"type": 1,
"content": self._transcoding(word),
"fromusername": self.user['username'],
"tousername": to,
"localid": clientmsgid,
"clientmsgid": clientmsgid,
}
}
headers = {'content-type': 'application/json; charset=utf-8'}
data = json.dumps(params, ensure_ascii=false).encode('utf8')
r =myrequests.post(url,data=data,headers=headers)
dic = r.json()
state = self.responsestate('sendmsg', dic['baseresponse'])
print (params)
return state
# print (params)
def wx_views(self):
print ('[.]正在获取好友列表..')
list = self.contactlist
alarmlist=[]
# list = json.dump(list,ensure_ascii=false)
for i in range(0,len(list)):
if list:
list[i]['id'] = i
name = self._untostr(list[i]['nickname'])
rname = self._untostr(list[i]['remarkname'])
id = i
#print (list[i])
print ('\t %d \t姓名:%s \t 备注:%s' %(id,name,rname))
else:
print ('[!]获取失败!')
exit()
while true:
try:
input = raw_input("[.]请设置告警对象id,使用空格隔开\n")
alist = input.split(' ')
except:
print ("[!]输入错误!")
try:
for i in range(0,len(alist)):
if alist:
for j in range(0, len(list)):
if int(alist[i]) == list[j]['id']:
print ('[*]你设置的对象是:%s' % self._untostr(list[j]['nickname']))
alarmlist.append(list[j]['username'])
self.alarmfriends.append(list[j]['username'])
else:
pass
except:
continue
if self.alarmfriends:
input = raw_input ("[!]确认设置(y/n)")
if input == 'y':
self.alarmfriends = alarmlist
break
elif input == 'n':
alarmlist = []
self.alarmfriends = []
pass
else:
alarmlist = []
self.alarmfriends = []
print ("[!]输入错误")
else:
print ("[!]检测不到有效输入,请重试")
print (self.alarmfriends)
def wx_heartbeatloop(self):
while true:
selector = self.synccheck()
if selector != '0':
self.webwxsync()
time.sleep(int(self.xintiao))
print ("[*]wechat心跳正常..")
def run(self):
while true:
time.sleep(5)
sleeptime = int(self.intervals)
print("[*]告警检测心跳..")
time = int(time.time())
lasttime = time - int(sleeptime)
select_sql = "select * from wechat_sendmsg where time between %d and %d" % (lasttime,time)
data = db.select(select_sql)
# print (data)
if data:
for i in data:
print (data )
triggertime = self._untostr(i[0])
hostname = self._untostr(i[2])
hostip = self._untostr(i[3])
description = self._untostr(i[4])
level = self._untostr(i[5])
msg = """
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
[!]发现告警
告警服务器:%s
告警时间:%s
告警ip:%s
告警项:%s
告警级别:%s
“”" %(hostname,triggertime,hostip,description,level)
print (msg)
for j in range(0,len(self.alarmfriends)):
self.webwxsendmsg(msg,self.alarmfriends[j])
# print (j)
# print (self.alarmfriends)
else:
pass
第二部分:zabbix api
from zabbixtriggerdb import sqlitedb
myrequests = requests.session()
db = sqlitedb
class zabbix(object):
def init(self):
self.holist = []
self.zabbix_address = ‘’
self.zabbix_username=’’
self.time = time.strftime(’%y-%m-%d %h:%m’)
self.passwd = ‘’
self.z_intervals = ‘’
self.w_intervals = ‘’
self.sleeptime = ‘’
self.trigger= []
self.lasttrigger = []
self.wxtriggerlist = []
#获取zabbix api token
def get_auth(self):
url = '%s/api_jsonrpc.php' % self.zabbix_address
params = json.dumps({
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"user": self.zabbix_username,
"password": self.passwd
},
"id": 0
})
headers = {'content-type': 'application/json; charset=utf-8'}
r = myrequests.post(url=url, data=params, headers=headers)
r.encoding = 'utf-8'
data = r.json()
return data['result']
#获取zabbix监控主机列表
def get_host(self):
url = '%s/api_jsonrpc.php' % self.zabbix_address
params = json.dumps({
"jsonrpc": "2.0",
"method": "host.get",
"params": {
"output":[
"hostid",
"name"
],
"selectinterfaces":[
"interfaceid",
"ip",
]
},
"id":2,
"auth":self.get_auth()
})
headers = {'content-type': 'application/json; charset=utf-8'}
r = myrequests.post(url=url, data=params, headers=headers)
r.encoding = 'utf-8'
data = r.json()
self.holist = data['result']
return self.holist
#获取告警
def get_trig(self,hostid):
url = '%s/api_jsonrpc.php' % self.zabbix_address
params = json.dumps({
"jsonrpc":"2.0",
"method":"trigger.get",
"params": {
"output": [
"triggerid",
"description",
"priority"
],
"filter": {
"value": 1,
"hostid":hostid
},
"sortfield": "priority",
"sortorder": "desc"
},
"auth": self.get_auth(),
"id":1
})
headers = {'content-type': 'application/json; charset=utf-8'}
r = myrequests.post(url=url, data=params, headers=headers)
r.encoding = 'utf-8'
data = r.json()
if data['result']:
# text = json.dumps(data,ensure_ascii=false)
return data['result']
else:
return none
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
#告警信息入库
def get_triggerlist(self):
list = self.holist
if list:
for i in range(0,len(list)):
# ip = self._untostr(list[i][‘interfaces’][‘ip’])
trigger = self.get_trig(list[i][‘hostid’])
level = {‘1’:‘disaster’,‘2’:‘high’,‘3’:‘average’,‘4’:‘warning’,‘5’:‘information’,
‘6’:‘not classified’}
if trigger != none:
trigger = self._untostr(trigger[0][‘description’])
level = self._untostr(trigger[0][‘priority’])
name = self._untostr(list[i][‘name’])
ip = self._untostr(list[i][‘interfaces’][0][‘ip’])
datatime = time.strftime("%y-%m-%d %h:%m", time.localtime())
time = int(time.time())
z_lasttime = time - int(self.z_intervals)
w_lasttime = time - int(self.w_intervals)
#这个地方先查询在间隔时间段内有没有存在相同数据,如果没有就插入,有就跳过
zabbix_sql = "select * from zabbix_trigger where hostname='%s' and \
description='%s' and time between %d and %d " % (name,trigger,z_lasttime,time)
z_data = db.select(zabbix_sql)
if z_data:
pass
else:
z_inset_sql = "insert into zabbix_trigger(data,time,hostname,hostip,description,level)\
values('%s',%d,'%s','%s','%s','%s');" % (datatime,time,name,ip,trigger,level[level])
db.insert(z_inset_sql)
#判断间隔时间内wechat_sendmsg表中是否存在相同输入,没有则插入
wechat_sql = "select * from wechat_sendmsg where hostname='%s' and \
description='%s' and time between %d and %d limit 1" % (name, trigger, w_lasttime, time)
w_data = db.select(wechat_sql)
if w_data:
pass
else:
w_inset_sql = "insert into wechat_sendmsg(data,time,hostname,hostip,description,level)\
values('%s',%d,'%s','%s','%s','%s');" % (datatime, time, name, ip, trigger, level[level])
db.insert(w_inset_sql)
#这里思路是 设定时间内获取告警信息存入zabbix表,
#然后再另外设定一个时间写入weixin表 这样做是为了一个时间范围内不重复告警
else:
print ("[!]获取主机列表失败,正在重新获取...")
self.get_auth()
self.get_host()
self.get_triggerlist()
1234567891011121314151617181920212223242526272829
#zabbix 心跳
def run(self):
while true:
print ("[*]zabbix心跳正常…")
time.sleep(self.sleeptime)
self.get_triggerlist()
第三部分:sqlite
coding=utf-8
import sqlite3,os
sqlitedb = os.path.join(os.getcwd(), ‘triggerdb.db’)
dbcon = sqlite3.connect(sqlitedb,check_same_thread=false) #多线程操作要开启这个选项
dbcur = dbcon.cursor()
class sqlitedb(object):
@staticmethod
def insert(sql):
try:
dbcur.execute(sql)
except sqlite3.error as e:
print ("[!]insert error! %s" % e.args[0])
dbcon.commit()
@staticmethod
def select(sql):
data = []
try:
dbcur.execute(sql)
data = dbcur.fetchall()
except sqlite3.error as e:
print ("[!]slect error!%s"% e.args[0])
return data
#初始化,新建两个表
@staticmethod
def creattable():
zabbix_sql ="create table if not exists zabbix_trigger (data text,time integer,hostname text, \
hostip text,description text,level text);"
weixin_sql = "create table if not exists wechat_sendmsg (data text,time integer,hostname text, \
hostip text,description text,level text);"
try:
dbcur.execute(zabbix_sql)
dbcur.execute(weixin_sql)
except sqlite3.error as e:
print ("[!]creat error! %s" % e.args[0])
123456789101112131415161718192021222324252627282930
第四部分:合体
coding=utf-8
from zabbix import zabbix
from zabbixtriggerdb import sqlitedb
from wechat import wechat
import os,sys,thread
if name == ‘main’:
db = sqlitedb
db.creattable()
z = zabbix() #zabbix类
w = wechat() #wechat类
z.zabbix_address = '服务器地址'
z.zabbix_username = 'zabbix用户'
z.passwd = 'zabbix密码'
z.z_intervals = 600 #zabbix告警入库间隔
z.w_intervals = 3600 #wechat告警入库间隔
z.sleeptime = 10 #zabbix心跳间隔
w.intervals = 3 #告警检测心跳间隔
w.xintiao = 2 #微信心跳间隔
z.get_auth() #zabbix token
z.get_host() #zabbix hostlist
z.get_triggerlist() #zabbix triggerlist
if not w.get_uuid():
print('[!]获取uuid失败,请重新运行!')
print('[*]正在获取二维码图片...')
w.genqrcode() #获取二维码
while w.waitforlogin() != '200':
pass
w.login() #登陆
w.webwxinit() #初始化
w.webwxgetcontact() #获取好友列表
w.wx_views() #设置告警好友
1234567891011121314151617181920212223242526272829303132333435
#定义一个线程方法,加入zabbix运行线程与微信发送告警线程
def run():
thread.start_new(z.run, ())
thread.start_new(w.run, ())
run()
#启动微信心跳(让微信保持在线状态)
w.wx_heartbeatloop()
12
最终效果:
wkiom1eqeb_hmsq1aavsd5rfseg653.png-wh_50
wkiol1eqeclcuc69aavjp8lldpi740.png-wh_50
wkiom1eqecbw8ci8aaetjbfsqg4621.png-wh_50
wkiom1eqecuz2kbaaai3sz-lefk380.png-wh_50
wkiol1eqecvzvp4qaaeas_empzi880.png-wh_50
: