zabbix微信个人账号告警-凯发app官方网站

凯发app官方网站-凯发k8官网下载客户端中心 | | 凯发app官方网站-凯发k8官网下载客户端中心
  • 博客访问: 13266
  • 博文数量: 4
  • 博客积分: 0
  • 博客等级: 民兵
  • 技术积分: 30
  • 用 户 组: 普通用户
  • 注册时间: 2020-04-18 10:16
文章分类
文章存档

2020年(4)

我的朋友
最近访客
相关博文
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·
  • ·

分类: python/ruby

2020-04-25 14:47:41

  前言:

  最近研究zabbix告警,网上看了帖子有各式各样姿势:电话语音告警,邮件告警,短信告警,微信公众号告警等等等..姿势五花八门,真是纠结。

  电话语音告警,短信告警首先pass 前者花钱,后者通过设置139邮箱,就可以实现伪短信告警效果。

  剩下邮件告警与微信公众号告警。邮件告警已经在部署的时候配置完毕,剩下这个微信公众号告警,查一下帖子,申请各种麻烦。那么有没有基于微信个人账号的告警呢?想到这个点,马上github一番。

  12345

  搜索到一些优秀的开源代码:

  及封包:

  经过一番思考,大致思路如下:

  微信web保持活动状态,通过不断使用zabbix api抓取故障告警,入库后微信发送告警给相关人员。

  123

  一 需求实现具体思路

  实现该需求是得有多个线程同时进行的

  1

  微信心跳,微信web版保持活动状态。

  zabbix api ,不断请求zabbix告警,发现告警后,判断后入库。

  使用数据库不断查询告警,如果发现符合条件告警则发送告警给相关人员。

  二 部分代码及注释

  第一部分:wechat

  coding=utf-8

  #伪装请求头

  headers = {‘user-agent’: ‘mozilla/5.0 (macintosh; intel mac os x 10_11_2) applewebkit/537.36 (khtml, like gecko) chrome/44.0.2403.125 safari/537.36’}

  myrequests = requests.session()

  myrequests.headers.update(headers)

  debug = false

  #微信类

  class wechat(object):

  def init(self):

  self.uuid = ‘’

  self.base_uri = ‘’

  self.push_uri = ‘’

  self.redirect_uri = ‘’

  self.baserequest = {}

  self.skey = ‘’

  self.wxuin = ‘’

  self.wxsid = ‘’

  self.skey = ‘’

  self.deviceid = ‘e’ repr(random())[2:17] #随机生成15位机器码

  self.pass_ticket = ‘’

  self.memberlist =[]

  self.contactlist = []

  self.alarmfriends =[]

  self.intervals = ‘’

  self.xintiao = ‘’

  #获取uuid

  def get_uuid(self):

  url = ''

  params = {

  'appid': 'wx782c26e4c19acffb',

  'fun': 'new',

  'lang': 'zh_cn',

  '_': int(time.time()),

  }

  r = myrequests.get(url=url, params=params)

  r.encoding = 'utf-8'

  data = r.text

  # data返回,code=200为状态.uuid="iztw06wnsg=="为uuid

  # window.qrlogin.code = 200; window.qrlogin.uuid = "iztwo6wnsg==";

  # 正则匹配:匹配出状态码 以及uuid

  regx = r'window.qrlogin.code = (\d ); window.qrlogin.uuid = "(\s ?)"'

  pm = re.search(regx, data)

  code = pm.group(1)

  if code == '200':

  self.uuid = pm.group(2)

  return true

  return false

  # windows下直接打开二维码图

  def _openwinqrcodeimg(self):

  url = '' self.uuid

  params = {

  't': 'webwx',

  '_': int(time.time())

  }

  r = myrequests.get(url=url, params=params)

  f = open(qrimagepath, 'wb')

  f.write(r.content)

  f.close()

  time.sleep(1)

  os.startfile(qrimagepath)

  # linux下的二维码处理

  def _printqr(self, mat):

  for i in mat:

  black = '\033[40m \033[0m'

  white = '\033[47m \033[0m'

  print (''.join([black if j else white for j in i]))

  def _str2qr(self, str):

  qr = qrcode.qrcode()

  qr.border = 1

  qr.add_data(str)

  mat = qr.get_matrix()

  self._printqr(mat) # qr.print_tty() or qr.print_ascii()

  # 判断操作系统,选择打开二维码扫描方式

  def genqrcode(self):

  if sys.platform.startswith('win'):

  self._openwinqrcodeimg()

  else:

  self._str2qr('' self.uuid)

  #等待登陆

  def waitforlogin(self, tip=1):

  time.sleep(tip)

  url = '%s&uuid=%s&_=%s' % (

  tip, self.uuid, int(time.time()))

  r = myrequests.get(url=url)

  r.encoding = 'utf-8'

  data = r.text

  # data返回:

  # window.code = 201;

  #判断返回码

  regx = r'window.code=(\d );'

  pm = re.search(regx, data)

  code = pm.group(1)

  if code == '201': # 已扫描

  print('[*]成功扫描,请在手机上点击确认以登录')

  elif code == '200': # 已登录

  print('[.]正在登录...')

  regx = r'window.redirect_uri="(\s ?)";'

  pm = re.search(regx, data)

  self.redirect_uri = pm.group(1) '&fun=new'

  base_uri = self.redirect_uri[:self.redirect_uri.rfind('/')]

  # push_uri与base_uri对应关系(排名分先后)

  services = [

  ('wx2.qq.com', 'webpush2.weixin.qq.com'),

  ('qq.com', 'webpush.weixin.qq.com'),

  ('web1.wechat.com', 'webpush1.wechat.com'),

  ('web2.wechat.com', 'webpush2.wechat.com'),

  ('wechat.com', 'webpush.wechat.com'),

  ('web1.wechatapp.com', 'webpush1.wechatapp.com'),

  ]

  # self.push_uri = self.base_uri

  self.push_uri = base_uri

  for (searchurl, pushurl) in services:

  if base_uri.find(searchurl) >= 0:

  self.push_uri = 'https://%s/cgi-bin/mmwebwx-bin' % pushurl

  self.base_uri = 'https://%s/cgi-bin/mmwebwx-bin' % searchurl

  break

  elif code == '408': # 超时

  pass

  # elif code == '400' or code == '500':

  return code

  #登陆

  def login(self):

  r = myrequests.get(url=self.redirect_uri)

  r.encoding = 'utf-8'

  data = r.text

  # print (data)

  # data返回

  # < ret > 0 < / ret > < message > ok < / message >

  # < skey >xxxx < skey >

  # < wxsid > xxxx < / wxsid >

  # < wxuin > xxxx < / wxuin >

  # < pass_ticket > xxxx < / pass_ticket >

  # < isgrayscale > 1 < / isgrayscale >

  #解析xml文件

  doc = xml.dom.minidom.parsestring(data)

  root = doc.documentelement

  for node in root.childnodes:

  if node.nodename == 'skey':

  self.skey = node.childnodes[0].data

  elif node.nodename == 'wxsid':

  self.wxsid = node.childnodes[0].data

  elif node.nodename == 'wxuin':

  self. wxuin = node.childnodes[0].data

  elif node.nodename == 'pass_ticket':

  self.pass_ticket = node.childnodes[0].data

  # print('skey: %s, wxsid: %s, wxuin: %s, pass_ticket: %s' % (skey, wxsid,wxuin, pass_ticket))

  if not all((self.skey, self.wxsid, self.wxuin, self.pass_ticket)):

  return false

  self.baserequest = {

  'uin': int(self.wxuin),

  'sid': self.wxsid,

  'skey': self.skey,

  'deviceid': self.deviceid,

  }

  # print (self.push_uri)

  return true

  def webwxinit(self):

  url = ( self.base_uri '/webwxinit?pass_ticket=%s&skey=%s&r=%s' \

  % (self.pass_ticket, self.skey, int(time.time())))

  params = {'baserequest': self.baserequest}

  headers = {'content-type': 'application/json; charset=utf-8'}

  r = myrequests.post(url=url, data=json.dumps(params), headers=headers)

  r.encoding = 'utf-8'

  data = r.json()

  self.synckey = data['synckey']

  self.user = data['user']

  if false:

  f = open(os.path.join(os.getcwd(), 'webwxinit.json'), 'wb')

  f.write(r.content)

  f.close()

  self.synckey = '|'.join([str(keyval['key']) '_' str(keyval['val']) for keyval in self.synckey['list']])

  state = self.responsestate('webwxinit', data['baseresponse'])

  return state

  def webwxstatusnotify(self):

  url = self.base_uri \

  '/webwxstatusnotify?lang=zh_cn&pass_ticket=%s' % (self.pass_ticket)

  params = {

  'baserequest': self.baserequest,

  "code": 3,

  "fromusername": self.user['username'],

  "tousername": self.user['username'],

  "clientmsgid": int(time.time())

  }

  r = myrequests.post(url=url, params=json.dumps(params))

  data = r.json()

  state = self.responsestate('wexinstatusnotify',data['baseresponse'])

  #return data['baseresponse']['ret'] == 0

  return state

  #获取好友列表

  def webwxgetcontact(self):

  url = (self.base_uri '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' % (\

  self.pass_ticket, self.skey, int(time.time())))

  headers = {'content-type': 'application/json; charset=utf-8'}

  r = myrequests.post(url=url, headers=headers)

  r.encoding = 'utf-8'

  data = r.json()

  if false:

  f = open(os.path.join(os.getcwd(), 'webwxgetcontact.json'), 'wb')

  f.write(r.content)

  f.close()

  self.memberlist = data['memberlist']

  specialusers = ["newsapp", "fmessage", "filehelper", "weibo", "qqmail", "tmessage", "qmessage",\

  "qqsync","floatbottle", "lbsapp", "shakeapp", "medianote", "qqfriend", "readerapp",\

  "blogapp", "facebookapp", "masssendapp","meishiapp", "feedsapp", "voip",\

  "blogappweixin", "weixin", "brandsessionholder", "weixinreminder",\

  "wxid_novlwrv3lqwv11", "gh_22b87fa7cb3c", "officialaccounts",\

  "notification_messages", "wxitil", "userexperience_alarm"]

  #将列表中特殊账号删除

  for i in range(len(self.memberlist) - 1, -1, -1):

  member = self.memberlist[i]

  if member['verifyflag'] & 8 != 0: # 公众号/服务号

  self.memberlist.remove(member)

  elif member['username'] in specialusers: # 特殊账号

  self.memberlist.remove(member)

  elif member['username'].find('@@') != -1: # 群聊

  self.memberlist.remove(member)

  elif member['username'] == self.user: # 自己

  self.memberlist.remove(member)

  self.contactlist = self.memberlist

  return true

  #发送信息

  def webwxsendmsg(self, word, to='filehelper'):

  url = self.base_uri \

  '/webwxsendmsg?pass_ticket=%s' % (self.pass_ticket)

  clientmsgid = str(int(time.time() * 1000)) \

  str(random())[:5].replace('.', '')

  params = {

  'baserequest':{

  "uin": int(self.wxuin),

  "sid": self.wxsid,

  "skey":self.skey,

  "deviceid": self.deviceid,

  },

  'scene': 0,

  'msg':{

  "type": 1,

  "content": self._transcoding(word),

  "fromusername": self.user['username'],

  "tousername": to,

  "localid": clientmsgid,

  "clientmsgid": clientmsgid,

  }

  }

  headers = {'content-type': 'application/json; charset=utf-8'}

  data = json.dumps(params, ensure_ascii=false).encode('utf8')

  r =myrequests.post(url,data=data,headers=headers)

  dic = r.json()

  state = self.responsestate('sendmsg', dic['baseresponse'])

  print (params)

  return state

  # print (params)

  def wx_views(self):

  print ('[.]正在获取好友列表..')

  list = self.contactlist

  alarmlist=[]

  # list = json.dump(list,ensure_ascii=false)

  for i in range(0,len(list)):

  if list:

  list[i]['id'] = i

  name = self._untostr(list[i]['nickname'])

  rname = self._untostr(list[i]['remarkname'])

  id = i

  #print (list[i])

  print ('\t %d \t姓名:%s \t 备注:%s' %(id,name,rname))

  else:

  print ('[!]获取失败!')

  exit()

  while true:

  try:

  input = raw_input("[.]请设置告警对象id,使用空格隔开\n")

  alist = input.split(' ')

  except:

  print ("[!]输入错误!")

  try:

  for i in range(0,len(alist)):

  if alist:

  for j in range(0, len(list)):

  if int(alist[i]) == list[j]['id']:

  print ('[*]你设置的对象是:%s' % self._untostr(list[j]['nickname']))

  alarmlist.append(list[j]['username'])

  self.alarmfriends.append(list[j]['username'])

  else:

  pass

  except:

  continue

  if self.alarmfriends:

  input = raw_input ("[!]确认设置(y/n)")

  if input == 'y':

  self.alarmfriends = alarmlist

  break

  elif input == 'n':

  alarmlist = []

  self.alarmfriends = []

  pass

  else:

  alarmlist = []

  self.alarmfriends = []

  print ("[!]输入错误")

  else:

  print ("[!]检测不到有效输入,请重试")

  print (self.alarmfriends)

  def wx_heartbeatloop(self):

  while true:

  selector = self.synccheck()

  if selector != '0':

  self.webwxsync()

  time.sleep(int(self.xintiao))

  print ("[*]wechat心跳正常..")

  def run(self):

  while true:

  time.sleep(5)

  sleeptime = int(self.intervals)

  print("[*]告警检测心跳..")

  time = int(time.time())

  lasttime = time - int(sleeptime)

  select_sql = "select * from wechat_sendmsg where time between %d and %d" % (lasttime,time)

  data = db.select(select_sql)

  # print (data)

  if data:

  for i in data:

  print (data )

  triggertime = self._untostr(i[0])

  hostname = self._untostr(i[2])

  hostip = self._untostr(i[3])

  description = self._untostr(i[4])

  level = self._untostr(i[5])

  msg = """



  [!]发现告警

  告警服务器:%s

  告警时间:%s

  告警ip:%s

  告警项:%s

  告警级别:%s

  “”" %(hostname,triggertime,hostip,description,level)

  print (msg)

  for j in range(0,len(self.alarmfriends)):

  self.webwxsendmsg(msg,self.alarmfriends[j])

  # print (j)

  # print (self.alarmfriends)

  else:

  pass

  第二部分:zabbix api

  from zabbixtriggerdb import sqlitedb

  myrequests = requests.session()

  db = sqlitedb

  class zabbix(object):

  def init(self):

  self.holist = []

  self.zabbix_address = ‘’

  self.zabbix_username=’’

  self.time = time.strftime(’%y-%m-%d %h:%m’)

  self.passwd = ‘’

  self.z_intervals = ‘’

  self.w_intervals = ‘’

  self.sleeptime = ‘’

  self.trigger= []

  self.lasttrigger = []

  self.wxtriggerlist = []

  #获取zabbix api token

  def get_auth(self):

  url = '%s/api_jsonrpc.php' % self.zabbix_address

  params = json.dumps({

  "jsonrpc": "2.0",

  "method": "user.login",

  "params": {

  "user": self.zabbix_username,

  "password": self.passwd

  },

  "id": 0

  })

  headers = {'content-type': 'application/json; charset=utf-8'}

  r = myrequests.post(url=url, data=params, headers=headers)

  r.encoding = 'utf-8'

  data = r.json()

  return data['result']

  #获取zabbix监控主机列表

  def get_host(self):

  url = '%s/api_jsonrpc.php' % self.zabbix_address

  params = json.dumps({

  "jsonrpc": "2.0",

  "method": "host.get",

  "params": {

  "output":[

  "hostid",

  "name"

  ],

  "selectinterfaces":[

  "interfaceid",

  "ip",

  ]

  },

  "id":2,

  "auth":self.get_auth()

  })

  headers = {'content-type': 'application/json; charset=utf-8'}

  r = myrequests.post(url=url, data=params, headers=headers)

  r.encoding = 'utf-8'

  data = r.json()

  self.holist = data['result']

  return self.holist

  #获取告警

  def get_trig(self,hostid):

  url = '%s/api_jsonrpc.php' % self.zabbix_address

  params = json.dumps({

  "jsonrpc":"2.0",

  "method":"trigger.get",

  "params": {

  "output": [

  "triggerid",

  "description",

  "priority"

  ],

  "filter": {

  "value": 1,

  "hostid":hostid

  },

  "sortfield": "priority",

  "sortorder": "desc"

  },

  "auth": self.get_auth(),

  "id":1

  })

  headers = {'content-type': 'application/json; charset=utf-8'}

  r = myrequests.post(url=url, data=params, headers=headers)

  r.encoding = 'utf-8'

  data = r.json()

  if data['result']:

  # text = json.dumps(data,ensure_ascii=false)

  return data['result']

  else:

  return none

  12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576

  #告警信息入库

  def get_triggerlist(self):

  list = self.holist

  if list:

  for i in range(0,len(list)):

  # ip = self._untostr(list[i][‘interfaces’][‘ip’])

  trigger = self.get_trig(list[i][‘hostid’])

  level = {‘1’:‘disaster’,‘2’:‘high’,‘3’:‘average’,‘4’:‘warning’,‘5’:‘information’,

  ‘6’:‘not classified’}

  if trigger != none:

  trigger = self._untostr(trigger[0][‘description’])

  level = self._untostr(trigger[0][‘priority’])

  name = self._untostr(list[i][‘name’])

  ip = self._untostr(list[i][‘interfaces’][0][‘ip’])

  datatime = time.strftime("%y-%m-%d %h:%m", time.localtime())

  time = int(time.time())

  z_lasttime = time - int(self.z_intervals)

  w_lasttime = time - int(self.w_intervals)

  #这个地方先查询在间隔时间段内有没有存在相同数据,如果没有就插入,有就跳过

  zabbix_sql = "select * from zabbix_trigger where hostname='%s' and \

  description='%s' and time between %d and %d " % (name,trigger,z_lasttime,time)

  z_data = db.select(zabbix_sql)

  if z_data:

  pass

  else:

  z_inset_sql = "insert into zabbix_trigger(data,time,hostname,hostip,description,level)\

  values('%s',%d,'%s','%s','%s','%s');" % (datatime,time,name,ip,trigger,level[level])

  db.insert(z_inset_sql)

  #判断间隔时间内wechat_sendmsg表中是否存在相同输入,没有则插入

  wechat_sql = "select * from wechat_sendmsg where hostname='%s' and \

  description='%s' and time between %d and %d limit 1" % (name, trigger, w_lasttime, time)

  w_data = db.select(wechat_sql)

  if w_data:

  pass

  else:

  w_inset_sql = "insert into wechat_sendmsg(data,time,hostname,hostip,description,level)\

  values('%s',%d,'%s','%s','%s','%s');" % (datatime, time, name, ip, trigger, level[level])

  db.insert(w_inset_sql)

  #这里思路是 设定时间内获取告警信息存入zabbix表,

  #然后再另外设定一个时间写入weixin表 这样做是为了一个时间范围内不重复告警

  else:

  print ("[!]获取主机列表失败,正在重新获取...")

  self.get_auth()

  self.get_host()

  self.get_triggerlist()

  1234567891011121314151617181920212223242526272829

  #zabbix 心跳

  def run(self):

  while true:

  print ("[*]zabbix心跳正常…")

  time.sleep(self.sleeptime)

  self.get_triggerlist()

  第三部分:sqlite

  coding=utf-8

  import sqlite3,os

  sqlitedb = os.path.join(os.getcwd(), ‘triggerdb.db’)

  dbcon = sqlite3.connect(sqlitedb,check_same_thread=false) #多线程操作要开启这个选项

  dbcur = dbcon.cursor()

  class sqlitedb(object):

  @staticmethod

  def insert(sql):

  try:

  dbcur.execute(sql)

  except sqlite3.error as e:

  print ("[!]insert error! %s" % e.args[0])

  dbcon.commit()

  @staticmethod

  def select(sql):

  data = []

  try:

  dbcur.execute(sql)

  data = dbcur.fetchall()

  except sqlite3.error as e:

  print ("[!]slect error!%s"% e.args[0])

  return data

  #初始化,新建两个表

  @staticmethod

  def creattable():

  zabbix_sql ="create table if not exists zabbix_trigger (data text,time integer,hostname text, \

  hostip text,description text,level text);"

  weixin_sql = "create table if not exists wechat_sendmsg (data text,time integer,hostname text, \

  hostip text,description text,level text);"

  try:

  dbcur.execute(zabbix_sql)

  dbcur.execute(weixin_sql)

  except sqlite3.error as e:

  print ("[!]creat error! %s" % e.args[0])

  123456789101112131415161718192021222324252627282930

  第四部分:合体

  coding=utf-8

  from zabbix import zabbix

  from zabbixtriggerdb import sqlitedb

  from wechat import wechat

  import os,sys,thread

  if name == ‘main’:

  db = sqlitedb

  db.creattable()

  z = zabbix() #zabbix类

  w = wechat() #wechat类

  z.zabbix_address = '服务器地址'

  z.zabbix_username = 'zabbix用户'

  z.passwd = 'zabbix密码'

  z.z_intervals = 600 #zabbix告警入库间隔

  z.w_intervals = 3600 #wechat告警入库间隔

  z.sleeptime = 10 #zabbix心跳间隔

  w.intervals = 3 #告警检测心跳间隔

  w.xintiao = 2 #微信心跳间隔

  z.get_auth() #zabbix token

  z.get_host() #zabbix hostlist

  z.get_triggerlist() #zabbix triggerlist

  if not w.get_uuid():

  print('[!]获取uuid失败,请重新运行!')

  print('[*]正在获取二维码图片...')

  w.genqrcode() #获取二维码

  while w.waitforlogin() != '200':

  pass

  w.login() #登陆

  w.webwxinit() #初始化

  w.webwxgetcontact() #获取好友列表

  w.wx_views() #设置告警好友

  1234567891011121314151617181920212223242526272829303132333435

  #定义一个线程方法,加入zabbix运行线程与微信发送告警线程

  def run():

  thread.start_new(z.run, ())

  thread.start_new(w.run, ())

  run()

  #启动微信心跳(让微信保持在线状态)

  w.wx_heartbeatloop()

  12

  最终效果:

  wkiom1eqeb_hmsq1aavsd5rfseg653.png-wh_50

  wkiol1eqeclcuc69aavjp8lldpi740.png-wh_50

  wkiom1eqecbw8ci8aaetjbfsqg4621.png-wh_50

  wkiom1eqecuz2kbaaai3sz-lefk380.png-wh_50

  wkiol1eqecvzvp4qaaeas_empzi880.png-wh_50



阅读(1954) | 评论(0) | 转发(0) |
0

上一篇:弹性分布式深度学习系统

下一篇:没有了

给主人留下些什么吧!~~
")); function link(t){ var href= $(t).attr('href'); href ="?url=" encodeuricomponent(location.href); $(t).attr('href',href); //setcookie("returnouturl", location.href, 60, "/"); }
网站地图