diff --git a/tests/filemon/test_linux.py b/tests/filemon/test_linux.py new file mode 100644 index 0000000000000000000000000000000000000000..7bcf31a162f98cb6c8bba8e95d56d8d50f8056eb --- /dev/null +++ b/tests/filemon/test_linux.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- + +import Queue +from unittest import TestCase + +from mock import Mock +from fixture import TempIO + +from edbob.configuration import AppConfigParser + +from rattail.filemon import linux +from rattail.filemon.config import Profile + + +class TestEventHandler(TestCase): + + def setUp(self): + self.tmp = TempIO() + self.config = AppConfigParser(u'rattail') + self.config.set(u'rattail.filemon', u'monitor', u'foo') + self.config.set(u'rattail.filemon', u'foo.dirs', self.tmp) + self.config.set(u'rattail.filemon', u'foo.actions', u'noop') + self.config.set(u'rattail.filemon', u'foo.action.noop.func', u'rattail.filemon.actions:noop') + self.profile = Profile(self.config, u'foo') + self.profile.queue = Queue.Queue() + self.handler = linux.EventHandler() + self.handler.my_init(self.profile) + + def test_in_access_event_does_nothing(self): + event = Mock(pathname=self.tmp.putfile(u'file', u'')) + self.handler.process_IN_ACCESS(event) + self.assertTrue(self.profile.queue.empty()) + + def test_in_attrib_event_does_nothing(self): + event = Mock(pathname=self.tmp.putfile(u'file', u'')) + self.handler.process_IN_ATTRIB(event) + self.assertTrue(self.profile.queue.empty()) + + def test_in_create_event_does_nothing(self): + event = Mock(pathname=self.tmp.putfile(u'file', u'')) + self.handler.process_IN_CREATE(event) + self.assertTrue(self.profile.queue.empty()) + + def test_in_modify_event_does_nothing(self): + event = Mock(pathname=self.tmp.putfile(u'file', u'')) + self.handler.process_IN_MODIFY(event) + self.assertTrue(self.profile.queue.empty()) + + def test_in_close_write_event_queues_file_if_profile_does_not_watch_locks(self): + event = Mock(pathname=self.tmp.putfile(u'file', u'')) + self.profile.watch_locks = False + self.handler.process_IN_CLOSE_WRITE(event) + self.assertEqual(self.profile.queue.qsize(), 1) + self.assertEqual(self.profile.queue.get_nowait(), self.tmp.join(u'file')) + + def test_in_close_write_event_does_nothing_if_profile_watches_locks(self): + event = Mock(pathname=self.tmp.putfile(u'file.lock', u'')) + self.profile.watch_locks = True + self.handler.process_IN_CLOSE_WRITE(event) + self.assertTrue(self.profile.queue.empty()) + + def test_in_moved_to_event_queues_file_if_profile_does_not_watch_locks(self): + event = Mock(pathname=self.tmp.putfile(u'file', u'')) + self.profile.watch_locks = False + self.handler.process_IN_MOVED_TO(event) + self.assertEqual(self.profile.queue.qsize(), 1) + self.assertEqual(self.profile.queue.get_nowait(), self.tmp.join(u'file')) + + def test_in_moved_to_event_does_nothing_if_profile_watches_locks(self): + event = Mock(pathname=self.tmp.putfile(u'file.lock', u'')) + self.profile.watch_locks = True + self.handler.process_IN_MOVED_TO(event) + self.assertTrue(self.profile.queue.empty()) + + def test_in_delete_event_queues_file_if_profile_watches_locks(self): + event = Mock(pathname=self.tmp.putfile(u'file.lock', u'')) + self.profile.watch_locks = True + self.handler.process_IN_DELETE(event) + self.assertEqual(self.profile.queue.qsize(), 1) + self.assertEqual(self.profile.queue.get_nowait(), self.tmp.join(u'file')) + + def test_in_moved_to_event_does_nothing_if_profile_does_not_watch_locks(self): + event = Mock(pathname=self.tmp.putfile(u'file', u'')) + self.profile.watch_locks = False + self.handler.process_IN_DELETE(event) + self.assertTrue(self.profile.queue.empty())