image.png

设计原因

1.由于客户存在多个商业级运维系统,每个运维系统都完全分开,需要每天登入各个运维系统查看是否存在告警。

2.商业级的运维系统毕竟是通用化,无法完美匹配客户的业务需求,需要自己对商业的运维系统做扩展。

3.商业级的运维系统有时候存在一些bug,对于少数设备无法进行监控和采集。

采集信息详情

  • 采集存储数据
    • 主机wwn,主机组,LUN组,LUN,端口组,端口等等信息
  • 采集光交数据
    • 光交硬件信息,光交wwn信息,光交端口错误,性能信息
  • 采集vmware虚拟化数据
    • 采集vmware主机,宿主机,存储信息
  • 采集snmp数据
    • 待定
  • 接收 snmp trap数据
    • 通过trap服务接口,采集以上设备的告警信息,并写入告警系统
  • 分布式存储采集
    • 采集分布式存储的容量信息,卷组,LUN等等信息
  • 其他采集
    • 直接通过商业运维系统的数据库进行采集,采集需要的数据进行处理后进行存储。

celery的安装及设置

安装celery

1.直接pip install celery进行安装。安装redis数据库,配置redis数据库密码。

2.创建项目目录,在目录下创建 conf文件夹。在conf内创建conf.py文件,存储一些配置信息

3.创建celery_run.py

4.创建其他目录,每个目录对应一套业务系统。

from __future__ import absolute_import,unicode_literals
from celery import Celery,platforms
from celery.schedules import crontab
from conf.conf import logger,redis_ip,redis_port,redis_pass,redis_broker,redis_backend
import sys
from os import path
#允许root用户运行
platforms.C_FORCE_ROOT = True


#设置最高40个任务同时运行
CELERYD_MAX_TASKS_PER_CHILD = 40


#设置本地时间
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False

#采用redis进行处理
app = Celery(
    'manager',
    #发布任务
    broker='redis://:%s@%s:%s/%s'%(redis_pass,redis_ip,redis_port,redis_broker),
    #返回任务结果
    backend='redis://:%s@%s:%s/%s'%(redis_pass,redis_ip,redis_port,redis_backend),
    #在每个应用目录下创建tasks.py,并将其设置到下面
    include=['Brocade.tasks','fst.tasks','alert_system.tasks'])

# celery配置存放
app.conf.update(
    #返回任务结果存储1小时
    result_expires=3600,
    #设置序列模式
    task_serializer = 'pickle',
    # result_serializer = 'pickle',
    accept_content =['pickle'],
    # accept_content =['application/json', 'pickle'],
    #设置本地时间,启用定时任务时使用
    CELERY_TIMEZONE = 'Asia/Shanghai',
    CELERY_ENABLE_UTC = False,
)

# 定时任务设置
app.conf.beat_schedule = {
    'multiply-at-some-time': {
        'task': 'alert_system.tasks.printtest',
        'schedule': crontab(hour=12, minute=00),   # 每天早上 6 点 00 分执行一次
        'args': ()                                  # 任务函数参数
    }

}
app.conf.timezone = "GMT"
if __name__ == '__main__':
    try:
        app.start()
    except Exception as e:
        logger.warning("启动失败%s"%(e))

创建完成后可以直接进行启动测试。celery -A celery_run worker -l info,启动成功正常。


mongodb数据库的设置。

在conf下创建mongodb.py,将数据库的操作全部放置到这里面,其他文件专注业务逻辑。需要先安装monogdb,pip install pymongo

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
from urllib.parse import quote_plus
import sys,datetime


from pymongo import MongoClient

try:
    from collect_manager.conf.conf import mong_ip,mong_user,mong_password,mong_app
except Exception:
    from .conf.conf import mong_ip, mong_user, mong_password, mong_app

#接收log日志使用。
class Logsystem():
    def __init__(self):
        self.obj = monogdb().use_logsystem()
    def warn(self,info):
        data = {
            "grade":"warn",
            "messages":info,
            "datetime":datetime.datetime.now(),
            "confirm": False,
        }
        self.obj.insert(data)
    def error(self,info):
        data = {
            "grade":"error",
            "messages":info,
            "datetime":datetime.datetime.now(),
            "confirm":False,
        }
        self.obj.insert(data)
    def info(self,info):
        data = {
            "grade":"info",
            "messages":info,
            "datetime":datetime.datetime.now(),
            "confirm":True,
        }
        self.obj.insert(data)
    def fail(self,info):
        data = {
            "grade":"fail",
            "messages":info,
            "datetime":datetime.datetime.now(),
            "confirm":False,
        }
        self.obj.insert(data)
        
        
# 登入monogdb使用
class monogdb():
    def __init__(self):
        """
        初始化登入系统
        """
        self.url = "mongodb://%s:%s@%s" % (quote_plus(mong_user),quote_plus(mong_password),mong_ip)
        self.client = MongoClient(self.url,connect=False)
        self.app = mong_app
    def Logindatabase(self):
        """
        返回登入得数据库
        :return:
        """
        dataname = self.client[self.app]
        return dataname
    def use_bro_san_tables(self):
        """
        使用bro_san表
        :return:
        """
        obj = self.Logindatabase()
        return obj["bro_san"]
    def use_soft_log_tables(self):
        obj = self.Logindatabase()
        return obj["soft_log"]
    def use_test_tables(self):
        obj = self.Logindatabase()
        return obj["test"]
    def use_virtual_machine_tables(self):
        obj = self.Logindatabase()
        return obj["virtual_machine"]
    def use_vmwarestorage_tables(self):
        obj = self.Logindatabase()
        return obj["vmware_storage"]
    def use_vmwarehost_tables(self):
        obj = self.Logindatabase()
        return obj["vmware_host"]
    def use_logsystem(self):
        """
            类型:
            messages
            qrade       错误类型,debug info warn  error fail
            is_confirm   是否确认
            is_delete   是否删除
            label       标注,确认是否为重复信息,采集mysql id信息。
            datetime  日期
        :return:
        """
        obj = self.Logindatabase()
        return obj["log_system"]


snmp trap的接收服务设置

负责接收trap的信息,由于 设备种类太多,没必要对MIB做解析,直接写入设备IP到日志系统内。

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import sys


sys.path.append(".")


from pysnmp.entity import engine, config
from pysnmp.proto import api

from pyasn1.codec.ber import decoder
from pysnmp.carrier.asynsock.dgram import udp, udp6

from pysnmp.carrier.asynsock.dispatch import AsynsockDispatcher

from pysnmp.entity.rfc3413 import ntfrcv
try:
    from monogdb_api import Logsystem
except:
    from .monogdb_api import Logsystem

Logsystem = Logsystem()

snmpEngine = engine.SnmpEngine()

def cbFun(transportDispatcher, transportDomain, transportAddress, wholeMsg):
    while wholeMsg:
        msgVer = int(api.decodeMessageVersion(wholeMsg))
        if msgVer in api.protoModules:
            pMod = api.protoModules[msgVer]
        else:
            print('Unsupported SNMP version %s' % msgVer)
            return
        reqMsg, wholeMsg = decoder.decode(
            wholeMsg, asn1Spec=pMod.Message(),
            )
        print('Notification message from %s:%s: ' % (
            transportDomain, transportAddress
            )
        )
        ipdress = transportAddress[0]
        reqPDU = pMod.apiMessage.getPDU(reqMsg)
        if reqPDU.isSameTypeWith(pMod.TrapPDU()):
            if msgVer == api.protoVersion1:
                try:
                    strlist = 'Enterprise: {},Agent Address: {},Generic Trap: {},Specific Trap: {},Uptime: {}'.format(
                        pMod.apiTrapPDU.getEnterprise(reqPDU).prettyPrint(),
                        pMod.apiTrapPDU.getAgentAddr(reqPDU).prettyPrint(),
                        pMod.apiTrapPDU.getGenericTrap(reqPDU).prettyPrint(),
                        pMod.apiTrapPDU.getSpecificTrap(reqPDU).prettyPrint(),
                        pMod.apiTrapPDU.getTimeStamp(reqPDU).prettyPrint()
                        )
                    print(strlist)
                except Exception as e:
                    strlist = ("messages error,error({})").format(e)
                varBinds = pMod.apiTrapPDU.getVarBindList(reqPDU)
            else:
                strlist= "error messages is not"
                varBinds = pMod.apiPDU.getVarBindList(reqPDU)
        Logsystem.error((u"ip ({}) is receive  trap error, Please check the device details immediately.n {}").format(ipdress,strlist))

            # print('Var-binds:')
            # print(str(varBinds))
    #         for oid, val in varBinds:
    #             #a = oid.prettyPrint().strip()
    #             b = val.prettyPrint().strip().split('n')
    #             #print(a)
    #             for line in b:
    #                 item = line.strip()
    #                 if item.startswith('string-value'):
    #                     print('string-value='+item.replace('string-value=0x','').decode('hex'))
    #                 else:
    #                     print(item)
    return wholeMsg


if __name__ == '__main__':
    transportDispatcher = AsynsockDispatcher()

    transportDispatcher.registerRecvCbFun(cbFun)

    # UDP/IPv4
    transportDispatcher.registerTransport(
        udp.domainName, udp.UdpSocketTransport().openServerMode(('0.0.0.0', 162))
    )

    # UDP/IPv6
    transportDispatcher.registerTransport(
        udp6.domainName, udp6.Udp6SocketTransport().openServerMode(('::1', 162))
    )

    transportDispatcher.jobStarted(1)

    try:
        # Dispatcher will never finish as job#1 never reaches zero
        transportDispatcher.runDispatcher()
    except:
        transportDispatcher.closeDispatcher()
        raise

存储数据的采集

在目录下创建应用目录storage,并在下面创建对应的子目录,比如华为存储设置huawei,富士通存储设置fst.并且在storage下创建tasks.py。存储数据通过web主动点击的方式进行采集。

设备品牌 采集方式 备注
富士通 ssh 通过ssh进行数据采集
华为 ssh 通过ssh进行数据采集
日立 web 手工采集
EMC web 手工采集

这个代码相对简单,直接ssh到设备进行采集就行,对于一些低端的设备通过登入web进行采集。

光交数据的采集

每天采集一次,通过ssh的方式进行采集,采集后进行清洗存储。

设备品牌 采集方式 备注
博科 ssh 通过ssh进行数据采集
思科 ssh 通过ssh进行数据采集

vmware虚拟化数据的采集

采集vmware的数据进行存储,对接客户的备份系统,明确查看虚拟化是否成功备份。

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import atexit
import requests
from pyVmomi import vim
from pyVim import connect
import json
from conf.conf import logger
def sizeof_fmt(num):
    """
    Returns the human readable version of a file size

    :param num:
    :return:
    """
    for item in ['bytes', 'KB', 'MB', 'GB']:
        if num 

利用supervisor进行进程管理

supervisor安装

1.apt-get install supervisor直接进行安装 。
2.echo_supervisord_conf > /etc/supervisord.conf,生成配置文件

3.配置:

    key在配置文件底部添加
        [include]
files=/etc/supervisor/*.conf #若你本地无/etc/supervisor目录,直接创建。

4.开机自启动

配置进程开机自启动“
    chmod +x /etc/init.d/supervisord
# in debian based:
sudo update-rc.d supervisord defaults
# in redhat
chkconfig --add supervisord
    设置开机启动:chkconfig supervisord on
    查看是否成功:chkconfig --list | grep supervisord
  1. 配置程序,在/etc/supervisor内创建3个conf配置文件。并写入以下3个文件。
[program:sanmanager]
directory=/home/collect_manager
command= celery -A run_celery worker -l info
stdout_logfile=/home/collect_manager/run.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true

[program:sanmanagerbreat]
directory=/home/collect_manager
command= celery -A run_celery heart -l info
stdout_logfile=/home/collect_manager/breat.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true

[program:trapserver]
directory=/home/collect_manager
command= python3 trapserver.py
stdout_logfile=/home/collect_manager/trapserver.log
user = root
autostart=true
autorestart=true
startsecs=60
stopasgroup=true
ikillasgroup=true
startretries=1
redirect_stderr=true

6.用法

supervisorctl的用法
supervisord : 启动
supervisor supervisorctl reload :修改完配置文件后重新启动
supervisor supervisorctl status :查看supervisor监管的进程状态 
supervisorctl start 进程名 :启动XXX进程 
supervisorctl stop 进程名 :停止XXX进程
 supervisorctl stop all:停止全部进程,注:start、restart、stop都不会载入最新的配置文件。 
supervisorctl update:根据最新的配置文件,启动新配置或有改动的进程,配置没有改动的进程不会受影响而重启

文章来源于互联网,如有雷同请联系站长删除:管理系统 celery采集器的设计

发表评论