Testing FAM with Python
Posted by sky | Tags: Linux, Open-Source, Python, testing
There is a Python module python-fam that uses the client lib of FAM. There is segmentation fault on deletion of python FAM obj, which can be checked using GDB. After replacing the code PyMem_DEL with PyObject_Del in _fam.c and rebuilt the extension, the segmentation fault goes away. (I submit this as a bug?) Here is the test code test_fam.py I wrote.
# test_fam.py
r"""
"""
import _fam
import select
from os import path
import sys
# from fam.h
"""enum FAMCodes { FAMChanged=1, FAMDeleted=2, FAMStartExecuting=3,
FAMStopExecuting=4, FAMCreated=5, FAMMoved=6,
FAMAcknowledge=7, FAMExists=8, FAMEndExist=9 };"""
FAMChanged=1
FAMDeleted=2
FAMStartExecuting=3
FAMStopExecuting=4
FAMCreated=5
#FAMMoved=6
FAMAcknowledge=7
FAMExists=8
FAMEndExist=9
_FM = _fam.open()
_Monitors = {}
_MonitorReqs = {}
def _notify(filename, change, code):
print filename, change
def monitor(filename, notify):
if filename not in _Monitors:
_Monitors[filename] = [notify]
if path.isdir(filename):
fr = _FM.monitorDirectory(filename, filename)
else:
fr = _FM.monitorFile(filename, None)
_MonitorReqs[filename] = fr
else:
_Monitors[filename].append(notify)
def unmonitor(filename, notify):
if filename not in _Monitors:
return
notifys = _Monitors[filename]
try:
notifys.remove(notify)
if not notifys:
fr = _MonitorReqs[filename]
#print 'cancelMonitor', fr.requestID()
fr.cancelMonitor()
del _MonitorReqs[filename]
del _Monitors[filename]
except Exception, e:
print e
def _notify_event(fe):
filename = fe.filename
if filename not in _Monitors:
if path.isabs(filename):
print "? not monitor", filename
return
# for dir monitor
adir = fe.userData
if adir in _Monitors:
notifys = _Monitors[adir]
filename = path.join(adir, filename)
else:
return
else:
notifys = _Monitors[filename]
ch, code = fe.code2str(), fe.code
print code, ch, fe.requestID, fe.userData, fe.hostname
for n in notifys:
n(filename, ch, code)
def run():
inl, eml = [_FM], []
while 1:
try:
ri, ro, re = select.select(inl, eml, eml)
except select.error, e:
print e
break
while _FM.pending():
print "check nextEvent"
fe = _FM.nextEvent()
_notify_event(fe)
#----------------------------------------------------------------------
def _unmonitor_onnotify(ecode):
notify = lambda filename, change, code: (code == ecode and unmonitor(filename, notify)
or _notify(filename, change, code))
return notify
def _test(files):
for fn in files:
monitor(fn, _unmonitor_onnotify(FAMDeleted))
monitor(fn, _unmonitor_onnotify(FAMChanged))
run()
#----------------------------------------------------------------------
if __name__ == '__main__':
a = sys.argv
alen = len(a)
if alen <= 1:
print "test_fam.py file1 file2 .."
sys.exit(0)
_test(a[1:])
Manual pages for fam.
Related: FAM Non-superuser Hack.
Previous Post
Next Post