管理系统 celery采集器的设计

设计原因
1.由于客户存在多个商业级运维系统,每个运维系统都完全分开,需要每天登入各个运维系统查看是否存在告警。
2.商业级的运维系统毕竟是通用化,无法完美匹配客户的业务需求,需要自己对商业的运维系统做扩展。
3.商业级的运维系统有时候存在一些bug,对于少数设备无法进行监控和采集。
采集信息详情
- 采集存储数据
- 主机wwn,主机组,LUN组,LUN,端口组,端口等等信息
- 采集光交数据
- 光交硬件信息,光交wwn信息,光交端口错误,性能信息
- 采集vmware虚拟化数据
- 采集vmware主机,宿主机,存储信息
- 采集snmp数据
- 待定
- 接收 snmp trap数据
- 通过trap服务接口,采集以上设备的告警信息,并写入告警系统
- 分布式存储采集
- 采集分布式存储的容量信息,卷组,LUN等等信息
- 其他采集
- 直接通过商业运维系统的数据库进行采集,采集需要的数据进行处理后进行存储。
celery的安装及设置
安装celery
1.直接pip install celery
进行安装。安装redis数据库,配置redis数据库密码。
2.创建项目目录,在目录下创建 conf文件夹。在conf内创建conf.py文件,存储一些配置信息
3.创建celery_run.py
4.创建其他目录,每个目录对应一套业务系统。
from __future__ import absolute_import,unicode_literals
from celery import Celery,platforms
from celery.schedules import crontab
from conf.conf import logger,redis_ip,redis_port,redis_pass,redis_broker,redis_backend
import sys
from os import path
#允许root用户运行
platforms.C_FORCE_ROOT = True
#设置最高40个任务同时运行
CELERYD_MAX_TASKS_PER_CHILD = 40
#设置本地时间
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
#采用redis进行处理
app = Celery(
'manager',
#发布任务
broker='redis://:%s@%s:%s/%s'%(redis_pass,redis_ip,redis_port,redis_broker),
#返回任务结果
backend='redis://:%s@%s:%s/%s'%(redis_pass,redis_ip,redis_port,redis_backend),
#在每个应用目录下创建tasks.py,并将其设置到下面
include=['Brocade.tasks','fst.tasks','alert_system.tasks'])
# celery配置存放
app.conf.update(
#返回任务结果存储1小时
result_expires=3600,
#设置序列模式
task_serializer = 'pickle',
# result_serializer = 'pickle',
accept_content =['pickle'],
# accept_content =['application/json', 'pickle'],
#设置本地时间,启用定时任务时使用
CELERY_TIMEZONE = 'Asia/Shanghai',
CELERY_ENABLE_UTC = False,
)
# 定时任务设置
app.conf.beat_schedule = {
'multiply-at-some-time': {
'task': 'alert_system.tasks.printtest',
'schedule': crontab(hour=12, minute=00), # 每天早上 6 点 00 分执行一次
'args': () # 任务函数参数
}
}
app.conf.timezone = "GMT"
if __name__ == '__main__':
try:
app.start()
except Exception as e:
logger.warning("启动失败%s"%(e))
创建完成后可以直接进行启动测试。celery -A celery_run worker -l info,启动成功正常。
mongodb数据库的设置。
在conf下创建mongodb.py,将数据库的操作全部放置到这里面,其他文件专注业务逻辑。需要先安装monogdb,pip install pymongo
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
from urllib.parse import quote_plus
import sys,datetime
from pymongo import MongoClient
try:
from collect_manager.conf.conf import mong_ip,mong_user,mong_password,mong_app
except Exception:
from .conf.conf import mong_ip, mong_user, mong_password, mong_app
#接收log日志使用。
class Logsystem():
def __init__(self):
self.obj = monogdb().use_logsystem()
def warn(self,info):
data = {
"grade":"warn",
"messages":info,
"datetime":datetime.datetime.now(),
"confirm": False,
}
self.obj.insert(data)
def error(self,info):
data = {
"grade":"error",
"messages":info,
"datetime":datetime.datetime.now(),
"confirm":False,
}
self.obj.insert(data)
def info(self,info):
data = {
"grade":"info",
"messages":info,
"datetime":datetime.datetime.now(),
"confirm":True,
}
self.obj.insert(data)
def fail(self,info):
data = {
"grade":"fail",
"messages":info,
"datetime":datetime.datetime.now(),
"confirm":False,
}
self.obj.insert(data)
# 登入monogdb使用
class monogdb():
def __init__(self):
"""
初始化登入系统
"""
self.url = "mongodb://%s:%s@%s" % (quote_plus(mong_user),quote_plus(mong_password),mong_ip)
self.client = MongoClient(self.url,connect=False)
self.app = mong_app
def Logindatabase(self):
"""
返回登入得数据库
:return:
"""
dataname = self.client[self.app]
return dataname
def use_bro_san_tables(self):
"""
使用bro_san表
:return:
"""
obj = self.Logindatabase()
return obj["bro_san"]
def use_soft_log_tables(self):
obj = self.Logindatabase()
return obj["soft_log"]
def use_test_tables(self):
obj = self.Logindatabase()
return obj["test"]
def use_virtual_machine_tables(self):
obj = self.Logindatabase()
return obj["virtual_machine"]
def use_vmwarestorage_tables(self):
obj = self.Logindatabase()
return obj["vmware_storage"]
def use_vmwarehost_tables(self):
obj = self.Logindatabase()
return obj["vmware_host"]
def use_logsystem(self):
"""
类型:
messages
qrade 错误类型,debug info warn error fail
is_confirm 是否确认
is_delete 是否删除
label 标注,确认是否为重复信息,采集mysql id信息。
datetime 日期
:return:
"""
obj = self.Logindatabase()
return obj["log_system"]
snmp trap的接收服务设置
负责接收trap的信息,由于 设备种类太多,没必要对MIB做解析,直接写入设备IP到日志系统内。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import sys
sys.path.append(".")
from pysnmp.entity import engine, config
from pysnmp.proto import api
from pyasn1.codec.ber import decoder
from pysnmp.carrier.asynsock.dgram import udp, udp6
from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher
from pysnmp.entity.rfc3413 import ntfrcv
try:
from monogdb_api import Logsystem
except:
from .monogdb_api import Logsystem
Logsystem = Logsystem()
snmpEngine = engine.SnmpEngine()
def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
while wholeMsg:
msgVer = int(api.decodeMessageVersion(wholeMsg))
if msgVer in api.protoModules:
pMod = api.protoModules[msgVer]
else:
print('Unsupported SNMP version %s' % msgVer)
return
reqMsg, wholeMsg = decoder.decode(
wholeMsg, asn1Spec=pMod.Message(),
)
print('Notification message from %s:%s: ' % (
transportDomain, transportAddress
)
)
ipdress = transportAddress[0]
reqPDU = pMod.apiMessage.getPDU(reqMsg)
if reqPDU.isSameTypeWith(pMod.TrapPDU()):
if msgVer == api.protoVersion1:
try:
strlist = 'Enterprise: {},Agent Address: {},Generic Trap: {},Specific Trap: {},Uptime: {}'.format(
pMod.apiTrapPDU.getEnterprise(reqPDU).prettyPrint(),
pMod.apiTrapPDU.getAgentAddr(reqPDU).prettyPrint(),
pMod.apiTrapPDU.getGenericTrap(reqPDU).prettyPrint(),
pMod.apiTrapPDU.getSpecificTrap(reqPDU).prettyPrint(),
pMod.apiTrapPDU.getTimeStamp(reqPDU).prettyPrint()
)
print(strlist)
except Exception as e:
strlist = ("messages error,error({})").format(e)
varBinds = pMod.apiTrapPDU.getVarBindList(reqPDU)
else:
strlist= "error messages is not"
varBinds = pMod.apiPDU.getVarBindList(reqPDU)
Logsystem.error((u"ip ({}) is receive trap error, Please check the device details immediately.n {}").format(ipdress,strlist))
# print('Var-binds:')
# print(str(varBinds))
# for oid, val in varBinds:
# #a = oid.prettyPrint().strip()
# b = val.prettyPrint().strip().split('n')
# #print(a)
# for line in b:
# item = line.strip()
# if item.startswith('string-value'):
# print('string-value='+item.replace('string-value=0x','').decode('hex'))
# else:
# print(item)
return wholeMsg
if __name__ == '__main__':
transportDispatcher = AsynsockDispatcher()
transportDispatcher.registerRecvCbFun(cbFun)
# UDP/IPv4
transportDispatcher.registerTransport(
udp.domainName, udp.UdpSocketTransport().openServerMode(('0.0.0.0', 162))
)
# UDP/IPv6
transportDispatcher.registerTransport(
udp6.domainName, udp6.Udp6SocketTransport().openServerMode(('::1', 162))
)
transportDispatcher.jobStarted(1)
try:
# Dispatcher will never finish as job#1 never reaches zero
transportDispatcher.runDispatcher()
except:
transportDispatcher.closeDispatcher()
raise
存储数据的采集
在目录下创建应用目录storage,并在下面创建对应的子目录,比如华为存储设置huawei,富士通存储设置fst.并且在storage下创建tasks.py。存储数据通过web主动点击的方式进行采集。
设备品牌 | 采集方式 | 备注 |
---|---|---|
富士通 | ssh | 通过ssh进行数据采集 |
华为 | ssh | 通过ssh进行数据采集 |
日立 | web | 手工采集 |
EMC | web | 手工采集 |
这个代码相对简单,直接ssh到设备进行采集就行,对于一些低端的设备通过登入web进行采集。
光交数据的采集
每天采集一次,通过ssh的方式进行采集,采集后进行清洗存储。
设备品牌 | 采集方式 | 备注 |
---|---|---|
博科 | ssh | 通过ssh进行数据采集 |
思科 | ssh | 通过ssh进行数据采集 |
vmware虚拟化数据的采集
采集vmware的数据进行存储,对接客户的备份系统,明确查看虚拟化是否成功备份。
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import atexit
import requests
from pyVmomi import vim
from pyVim import connect
import json
from conf.conf import logger
def sizeof_fmt(num):
"""
Returns the human readable version of a file size
:param num:
:return:
"""
for item in ['bytes', 'KB', 'MB', 'GB']:
if num
利用supervisor进行进程管理
supervisor安装
1.apt-get install supervisor直接进行安装 。
2.echo_supervisord_conf > /etc/supervisord.conf,生成配置文件
3.配置:
key在配置文件底部添加
[include]
files=/etc/supervisor/*.conf #若你本地无/etc/supervisor目录,直接创建。
4.开机自启动
配置进程开机自启动“
chmod +x /etc/init.d/supervisord
# in debian based:
sudo update-rc.d supervisord defaults
# in redhat
chkconfig --add supervisord
设置开机启动:chkconfig supervisord on
查看是否成功:chkconfig --list | grep supervisord
- 配置程序,在/etc/supervisor内创建3个conf配置文件。并写入以下3个文件。
[program:sanmanager]
directory=/home/collect_manager
command= celery -A run_celery worker -l info
stdout_logfile=/home/collect_manager/run.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true
[program:sanmanagerbreat]
directory=/home/collect_manager
command= celery -A run_celery heart -l info
stdout_logfile=/home/collect_manager/breat.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true
[program:trapserver]
directory=/home/collect_manager
command= python3 trapserver.py
stdout_logfile=/home/collect_manager/trapserver.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true
6.用法
supervisorctl的用法
supervisord : 启动
supervisor supervisorctl reload :修改完配置文件后重新启动
supervisor supervisorctl status :查看supervisor监管的进程状态
supervisorctl start 进程名 :启动XXX进程
supervisorctl stop 进程名 :停止XXX进程
supervisorctl stop all:停止全部进程,注:start、restart、stop都不会载入最新的配置文件。
supervisorctl update:根据最新的配置文件,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启
文章来源于互联网,如有雷同请联系站长删除:管理系统 celery采集器的设计