Files @ 2c7d6c16e14f
Branch filter:

Location: rattail-project/rattail/tests/filemon/test_config.py

lance
File Monitor overhaul!

* New configuration syntax (old syntax still supported but deprecated).
* Class-based actions.
* Configure keyword arguments to action callables.
* Configure retry for actions.
* Add (some) tests, docs.
# -*- coding: utf-8 -*-

import os
from unittest import TestCase

from fixture import TempIO

from edbob.configuration import AppConfigParser

from rattail.filemon import config
from rattail.filemon import Action
from rattail.exceptions import ConfigurationError


class TestProfile(TestCase):

    def setUp(self):
        self.config = AppConfigParser(u'rattail')
        self.config.set(u'rattail.filemon', u'foo.actions', u'bar')

    def test_empty_config_means_empty_profile(self):
        profile = config.Profile(self.config, u'nonexistent_key')
        self.assertEqual(len(profile.dirs), 0)
        self.assertFalse(profile.watch_locks)
        self.assertTrue(profile.process_existing)
        self.assertFalse(profile.stop_on_error)
        self.assertEqual(len(profile.actions), 0)

    def test_action_must_specify_callable(self):
        self.assertRaises(ConfigurationError, config.Profile, self.config, u'foo')

    def test_action_must_not_specify_both_func_and_class_callables(self):
        self.config.set(u'rattail.filemon', u'foo.action.bar.class', u'baz')
        self.config.set(u'rattail.filemon', u'foo.action.bar.func', u'baz')
        self.assertRaises(ConfigurationError, config.Profile, self.config, u'foo')

    def test_action_with_func_callable(self):
        self.config.set(u'rattail.filemon', u'foo.action.bar.func', u'os:remove')
        profile = config.Profile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        action = profile.actions[0]
        self.assertEqual(action.spec, u'os:remove')
        self.assertTrue(action.action is os.remove)

    def test_action_with_class_callable(self):
        self.config.set(u'rattail.filemon', u'foo.action.bar.class', u'rattail.filemon:Action')
        profile = config.Profile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        action = profile.actions[0]
        self.assertEqual(action.spec, u'rattail.filemon:Action')
        self.assertTrue(isinstance(action.action, Action))

    def test_action_with_args(self):
        self.config.set(u'rattail.filemon', u'foo.action.bar.func', u'shutil:move')
        self.config.set(u'rattail.filemon', u'foo.action.bar.args', u'/dev/null')
        profile = config.Profile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        action = profile.actions[0]
        self.assertEqual(len(action.args), 1)
        self.assertEqual(action.args[0], u'/dev/null')

    def test_action_with_kwargs(self):
        self.config.set(u'rattail.filemon', u'foo.action.bar.func', u'rattail.filemon.actions:raise_exception')
        self.config.set(u'rattail.filemon', u'foo.action.bar.kwarg.message', u"Hello World")
        profile = config.Profile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        action = profile.actions[0]
        self.assertEqual(len(action.kwargs), 1)
        self.assertEqual(action.kwargs[u'message'], u"Hello World")

    def test_action_with_default_retry(self):
        self.config.set(u'rattail.filemon', u'foo.action.bar.func', u'rattail.filemon.actions:noop')
        profile = config.Profile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        action = profile.actions[0]
        self.assertEqual(action.retry_attempts, 1)
        self.assertEqual(action.retry_delay, 0)

    def test_action_with_valid_configured_retry(self):
        self.config.set(u'rattail.filemon', u'foo.action.bar.func', u'rattail.filemon.actions:noop')
        self.config.set(u'rattail.filemon', u'foo.action.bar.retry_attempts', u'42')
        self.config.set(u'rattail.filemon', u'foo.action.bar.retry_delay', u'100')
        profile = config.Profile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        action = profile.actions[0]
        self.assertEqual(action.retry_attempts, 42)
        self.assertEqual(action.retry_delay, 100)

    def test_action_with_invalid_configured_retry(self):
        self.config.set(u'rattail.filemon', u'foo.action.bar.func', u'rattail.filemon.actions:noop')
        self.config.set(u'rattail.filemon', u'foo.action.bar.retry_attempts', u'-1')
        self.config.set(u'rattail.filemon', u'foo.action.bar.retry_delay', u'-1')
        profile = config.Profile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        action = profile.actions[0]
        self.assertEqual(action.retry_attempts, 1)
        self.assertEqual(action.retry_delay, 0)

    def test_normalize_dirs(self):
        tmp = TempIO()
        dir1 = tmp.mkdir(u'dir1')
        # dir2 will be pruned due to its not existing
        dir2 = tmp.mkdir(u'dir2')
        os.rmdir(dir2)
        # file1 will be pruned due to its not being a directory
        file1 = tmp.putfile(u'file1', u'')
        self.config.set(u'rattail.filemon', u'foo.action.bar.func', u'os:remove')
        self.config.set(u'rattail.filemon', u'foo.dirs', u' '.join([u'"{0}"'.format(d) for d in [dir1, dir2, file1]]))
        profile = config.Profile(self.config, u'foo')
        self.assertEqual(len(profile.dirs), 1)
        self.assertEqual(profile.dirs[0], dir1)


class TestLoadProfiles(TestCase):

    def setUp(self):
        self.tmp = TempIO()
        self.config = AppConfigParser(u'rattail')
        self.config.set(u'rattail.filemon', u'monitor', u'foo, bar')
        self.config.set(u'rattail.filemon', u'foo.dirs', u'"{0}"'.format(self.tmp))
        self.config.set(u'rattail.filemon', u'foo.actions', u'delete')
        self.config.set(u'rattail.filemon', u'foo.action.delete.func', u'os:remove')
        self.config.set(u'rattail.filemon', u'bar.dirs', u'"{0}"'.format(self.tmp))
        self.config.set(u'rattail.filemon', u'bar.actions', u'delete')
        self.config.set(u'rattail.filemon', u'bar.action.delete.func', u'os:remove')

    def test_returns_all_profiles_specified_in_monitor_option(self):
        monitored = config.load_profiles(self.config)
        self.assertEqual(len(monitored), 2)
        # leave profiles intact but replace monitor option with one key only
        self.config.set(u'rattail.filemon', u'monitor', u'foo')
        monitored = config.load_profiles(self.config)
        self.assertEqual(len(monitored), 1)

    def test_monitor_option_must_be_specified(self):
        self.config.remove_option(u'rattail.filemon', u'monitor')
        self.assertRaises(ConfigurationError, config.load_profiles, self.config)

    def test_profiles_which_define_no_watched_folders_are_pruned(self):
        monitored = config.load_profiles(self.config)
        self.assertEqual(len(monitored), 2)
        # remove foo's watched folder(s)
        self.config.remove_option(u'rattail.filemon', u'foo.dirs')
        monitored = config.load_profiles(self.config)
        self.assertEqual(len(monitored), 1)

    def test_profiles_which_define_no_actions_are_pruned(self):
        monitored = config.load_profiles(self.config)
        self.assertEqual(len(monitored), 2)
        # remove foo's actions
        self.config.remove_option(u'rattail.filemon', u'foo.actions')
        monitored = config.load_profiles(self.config)
        self.assertEqual(len(monitored), 1)

    def test_fallback_to_legacy_mode(self):
        # replace 'monitor' option with 'monitored' and update profiles accordingly
        self.config.remove_option(u'rattail.filemon', u'monitor')
        self.config.set(u'rattail.filemon', u'monitored', u'foo, bar')
        self.config.set(u'rattail.filemon', u'foo.dirs', u"['{0}']".format(self.tmp))
        self.config.set(u'rattail.filemon', u'foo.actions', u"['os:remove']")
        self.config.set(u'rattail.filemon', u'bar.dirs', u"['{0}']".format(self.tmp))
        self.config.set(u'rattail.filemon', u'bar.actions', u"['os:remove']")
        monitored = config.load_profiles(self.config)
        self.assertEqual(len(monitored), 2)
        profiles = list(monitored.values())
        self.assertTrue(isinstance(profiles[0], config.LegacyProfile))
        self.assertTrue(isinstance(profiles[1], config.LegacyProfile))


class TestLegacyProfile(TestCase):

    def setUp(self):
        self.config = AppConfigParser(u'rattail')

    def test_empty_config_means_empty_profile(self):
        profile = config.LegacyProfile(self.config, u'nonexistent_key')
        self.assertEqual(len(profile.dirs), 0)
        self.assertFalse(profile.watch_locks)
        self.assertTrue(profile.process_existing)
        self.assertFalse(profile.stop_on_error)
        self.assertEqual(len(profile.actions), 0)

    def test_action_with_spec_only(self):
        self.config.set(u'rattail.filemon', u'foo.actions', u"['os:remove']")
        profile = config.LegacyProfile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        spec, action, args, kwargs = profile.actions[0]
        self.assertEqual(spec, u'os:remove')
        self.assertTrue(action is os.remove)

    def test_action_with_spec_and_args(self):
        self.config.set(u'rattail.filemon', u'foo.actions', u"[('shutil:move', u'/dev/null')]")
        profile = config.LegacyProfile(self.config, u'foo')
        self.assertEqual(len(profile.actions), 1)
        spec, action, args, kwargs = profile.actions[0]
        self.assertEqual(spec, u'shutil:move')
        self.assertEqual(len(args), 1)
        self.assertEqual(args[0], u'/dev/null')

    def test_normalize_dirs(self):
        tmp = TempIO()
        dir1 = tmp.mkdir(u'dir1')
        # dir2 will be pruned due to its not existing
        dir2 = tmp.mkdir(u'dir2')
        os.rmdir(dir2)
        # file1 will be pruned due to its not being a directory
        file1 = tmp.putfile(u'file1', u'')
        self.config.set(u'rattail.filemon', u'foo.dirs', u"[{0}]".format(u', '.join([u"'{0}'".format(d) for d in [dir1, dir2, file1]])))
        profile = config.LegacyProfile(self.config, u'foo')
        self.assertEqual(len(profile.dirs), 1)
        self.assertEqual(profile.dirs[0], dir1)


class TestLoadLegacyProfiles(TestCase):

    def setUp(self):
        self.tmp = TempIO()
        self.config = AppConfigParser(u'rattail')
        self.config.set(u'rattail.filemon', u'monitored', u'foo, bar')
        self.config.set(u'rattail.filemon', u'foo.dirs', u"['{0}']".format(self.tmp))
        self.config.set(u'rattail.filemon', u'foo.actions', u"['os:remove']")
        self.config.set(u'rattail.filemon', u'bar.dirs', u"['{0}']".format(self.tmp))
        self.config.set(u'rattail.filemon', u'bar.actions', u"['os:remove']")

    def test_returns_all_profiles_specified_in_monitor_option(self):
        monitored = config.load_legacy_profiles(self.config)
        self.assertEqual(len(monitored), 2)
        # leave profiles intact but replace monitored option with one key only
        self.config.set(u'rattail.filemon', u'monitored', u'foo')
        monitored = config.load_legacy_profiles(self.config)
        self.assertEqual(len(monitored), 1)

    def test_monitor_option_must_be_specified(self):
        self.config.remove_option(u'rattail.filemon', u'monitored')
        self.assertRaises(ConfigurationError, config.load_legacy_profiles, self.config)

    def test_profiles_which_define_no_watched_folders_are_pruned(self):
        monitored = config.load_legacy_profiles(self.config)
        self.assertEqual(len(monitored), 2)
        # remove foo's watched folder(s)
        self.config.remove_option(u'rattail.filemon', u'foo.dirs')
        monitored = config.load_legacy_profiles(self.config)
        self.assertEqual(len(monitored), 1)

    def test_profiles_which_define_no_actions_are_pruned(self):
        monitored = config.load_legacy_profiles(self.config)
        self.assertEqual(len(monitored), 2)
        # remove foo's actions
        self.config.remove_option(u'rattail.filemon', u'foo.actions')
        monitored = config.load_legacy_profiles(self.config)
        self.assertEqual(len(monitored), 1)


class TestParseList(TestCase):

    def test_none(self):
        value = config.parse_list(None)
        self.assertEqual(len(value), 0)

    def test_single_value(self):
        value = config.parse_list(u'foo')
        self.assertEqual(len(value), 1)
        self.assertEqual(value[0], u'foo')

    def test_single_value_padded_by_spaces(self):
        value = config.parse_list(u'   foo   ')
        self.assertEqual(len(value), 1)
        self.assertEqual(value[0], u'foo')

    def test_slash_is_not_a_separator(self):
        value = config.parse_list(u'/dev/null')
        self.assertEqual(len(value), 1)
        self.assertEqual(value[0], u'/dev/null')

    def test_multiple_values_separated_by_whitespace(self):
        value = config.parse_list(u'foo bar baz')
        self.assertEqual(len(value), 3)
        self.assertEqual(value[0], u'foo')
        self.assertEqual(value[1], u'bar')
        self.assertEqual(value[2], u'baz')

    def test_multiple_values_separated_by_commas(self):
        value = config.parse_list(u'foo,bar,baz')
        self.assertEqual(len(value), 3)
        self.assertEqual(value[0], u'foo')
        self.assertEqual(value[1], u'bar')
        self.assertEqual(value[2], u'baz')

    def test_multiple_values_separated_by_whitespace_and_commas(self):
        value = config.parse_list(u'  foo,   bar   baz')
        self.assertEqual(len(value), 3)
        self.assertEqual(value[0], u'foo')
        self.assertEqual(value[1], u'bar')
        self.assertEqual(value[2], u'baz')

    def test_multiple_values_separated_by_whitespace_and_commas_with_some_quoting(self):
        value = config.parse_list(u"""
        foo
        "C:\\some path\\with spaces\\and, a comma",
        baz
""")
        self.assertEqual(len(value), 3)
        self.assertEqual(value[0], u'foo')
        self.assertEqual(value[1], u'C:\\some path\\with spaces\\and, a comma')
        self.assertEqual(value[2], u'baz')