liunx pyinotify的安装和使用

liunx pyinotify的安装和使用,第1张

概述介绍此功能是检测目录的 *** 作的事件 1.安装  在百度云盘下载或者在gits上下载安装包   链接:https://pan.baidu.com/s/1Lqt872YEgEo_bNPEnEJMaw   提取码:bjl2 # git clone https://github.com/seb-m/pyinotify.git # cd pyinotify/# ls# python set

介绍此功能是检测目录的 *** 作的事件

1.安装 

在百度云盘下载或者在gits上下载安装包

  链接:https://pan.baIDu.com/s/1Lqt872YEgEo_bNPEnEJMaw
  提取码:bjl2

# git clone https://github.com/seb-m/pyinotify.git     # cd pyinotify/# ls# python setup.py install

2.使用在代码中直接导入就可以

# -*- Coding: utf-8 -*-# !/usr/bin/env pythonimport osimport Queueimport datetimeimport pyinotifyimport logging#deBUG import threading,timepos = 0save_pos = 0filename =""   #filename1 =""pathname =""class Singleton(type):    def __call__(cls,*args,**kwargs):        if not hasattr(cls,instance):            cls.instance = super(Singleton,cls).__call__(*args,**kwargs)        return cls.instance    def __new__(cls,name,bases,dct):        return type.__new__(cls,dct)     def __init__(cls,dct):        super(Singleton,cls).__init__(name,dct)class AuditLog(object):    __Metaclass__ = Singleton    def __init__(self):        FORMAT = (%(asctime)s.%(msecs)d-%(levelname)s                  [%(filename)s:%(lineno)d:%(funcname)s]: %(message)s)        DATEFMT = %Y-%m-%d %H:%M:%s        logging.basicConfig(level=logging.DEBUG,format=FORMAT,datefmt=DATEFMT)        logging.deBUG("__init__")        self.log_queue = Queue.Queue(maxsize=1000)        self.str = "AuditLog"    def start(self,path):  #  path   as:  /var/log/auth.log        try:            print "start:",threading.currentThread()            if( not os.path.exists(path)):                logging.deBUG("文件路径不存在")                return            logging.deBUG("文件路径:{}".format(path))            global pathname            pathname = path            # # 输出前面的log            # printlog()            file =  path[:path.rfind(/)]                        global filename            global filename1            filename = path[path.rfind(/)+1:]            filename1 = filename + ".1"            print "filename:",filename,"    filename1 ",filename1            # watch manager            wm = pyinotify.WatchManager()            wm.add_watch(file,pyinotify.ALL_EVENTS,rec=True)            eh = MyEventHandler()            # notifIEr            notifIEr = pyinotify.NotifIEr(wm,eh)            notifIEr.loop()        except Exception as e:            logging.error([AuditLog]:send error: %s,str(e))            return    def read_log_file(self):        global pos        global save_pos        try:            # if( not os.pathname.exists(pathname)):            #     logging.deBUG("读取的文件不存在")            #     return            fd = open(pathname)            if pos != 0:                fd.seek(pos,0)            while True:                line = fd.readline()                if line.strip():                    logging.deBUG("put queue:{}".format(line.strip()))                    try:                        self.log_queue.put_Nowait(line.strip())                    except Queue.Full:                        logging.warn(send_log_queue is full Now)                                        pos = fd.tell()                    save_pos = pos                else:                    break            fd.close()        except Exception,e:            logging.error([AuditLog]:send error: %s,str(e))    def read_log1_file(self):                try:            global save_pos            global pos             pos = 0            pathname1 = pathname + ".1"            # if( not os.pathname1.exists(pathname1)):            #     logging.deBUG("读取的文件不存在")            #     return            fd = open(pathname1)            if save_pos != 0:                fd.seek(save_pos,0)            while True:                line = fd.readline()                if line.strip():                    if save_pos > fd.tell() :                        print save_pos,"   ",fd.tell()                        continue                     try:                        self.log_queue.put_Nowait(line.strip())                        logging.deBUG("put queue:{}".format(line.strip()))                    except Queue.Full:                        logging.warn(send_log_queue is full Now)                                    else:                    save_pos = 0                    break            fd.close()        except Exception,str(e))    def __del__(self):        print "del"class MyEventHandler(pyinotify.ProcessEvent):    print "MyEventHandler :",threading.currentThread()    def __init__(self):        print "MyEventHandler  __init__"        self.auditlogobject = AuditLog()        print self.auditlogobject.str    # 当文件被修改时调用函数    def process_IN_MODIFY(self,event):  #文件修改        print "process_IN_MODIFY",event.name    def process_IN_CREATE(self,event): #文件创建        # logging.deBUG("process_IN_CREATE")        print "create event:",event.name                #  log.log.1        global save_pos        try:            if(event.name == filename):                 print "=="                self.auditlogobject.read_log1_file()            else:                logging.deBUG("审计的文件不相同{}  {}".format(filename1).format(event.name))        except Exception as e:            logging.error([AuditLog]:send error: %s,str(e))     def process_IN_DELETE(self,event):   #文件删除        # logging.deBUG("process_IN_DELETE")        print "delete event:",event.name    def process_IN_ACCESS(self,event):  #访问        # logging.deBUG("process_IN_ACCESS")        print "ACCESS event:",event.name         def process_IN_ATTRIB(self,event):  #属性        # logging.deBUG("process_IN_ATTRIB")        print "ATTRIB event:  文件属性",event.name         def process_IN_CLOSE_NowRITE(self,event):        # logging.deBUG("process_IN_CLOSE_NowRITE")        print "CLOSE_NowRITE event:",event.name         def process_IN_CLOSE_WRITE(self,event):  # 关闭写入        # logging.deBUG("process_IN_CLOSE_WRITE")        print "CLOSE_WRITE event:",event.name        try:            if(event.name == filename):                self.auditlogobject.read_log_file()            else:                logging.deBUG("文件名不对 不是审计文件")                return        except Exception as e:            logging.error([AuditLog]:send error: %s,str(e))              def process_IN_OPEN(self,event):  # 打开        # logging.deBUG("process_IN_OPEN")        print "OPEN event:",event.namedef Producer():    try:        auditlogobj = AuditLog()        auditlogobj.start("./log/log.log")    except Exception as e:        logging.error([auditLog]:send error: %s,str(e))   def  Consumer():    try:        print "Consumer"        auditlogobject = AuditLog()        print auditlogobject.str        while True:            print "ddd"            time.sleep(1)                        while auditlogobject.log_queue.qsize() != 0 :                print "queue size",auditlogobject.log_queue.qsize()                print auditlogobject.log_queue.get()            # time.sleep(5)    except Exception as e:        logging.error([AuditLog]:send error: %s,str(e))if __name__ == __main__:      # a = threading.Thread(target=Producer,)    # a.start()    b = threading.Thread(target=Consumer,)    b.start()
总结

以上是内存溢出为你收集整理的liunx pyinotify的安装和使用全部内容,希望文章能够帮你解决liunx pyinotify的安装和使用所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/yw/1020387.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-05-23
下一篇2022-05-23

发表评论

登录后才能评论

评论列表(0条)

    保存