Changeset - 9694902d4cdd
[Not reviewed]
0 2 0
Lance Edgar - 9 years ago 2015-08-16 19:36:40
ledgar@sacfoodcoop.com
Remove some unused/unwanted command line arguments.

These weren't bad ideas, but hadn't really been implemented. Easier just
to get rid of them for now.
2 files changed with 1 insertions and 32 deletions:
0 comments (0 inline, 0 general)
rattail/commands.py
Show inline comments
 
@@ -3,345 +3,332 @@
 
#
 
#  Rattail -- Retail Software Framework
 
#  Copyright © 2010-2015 Lance Edgar
 
#
 
#  This file is part of Rattail.
 
#
 
#  Rattail is free software: you can redistribute it and/or modify it under the
 
#  terms of the GNU Affero General Public License as published by the Free
 
#  Software Foundation, either version 3 of the License, or (at your option)
 
#  any later version.
 
#
 
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
 
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
#  FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
 
#  more details.
 
#
 
#  You should have received a copy of the GNU Affero General Public License
 
#  along with Rattail.  If not, see <http://www.gnu.org/licenses/>.
 
#
 
################################################################################
 
"""
 
Console Commands
 
"""
 

	
 
from __future__ import absolute_import
 
from __future__ import unicode_literals
 

	
 
import os
 
import sys
 
import platform
 
import argparse
 
import datetime
 
import socket
 
import shutil
 
import warnings
 
import logging
 
from getpass import getpass
 

	
 
import edbob
 

	
 
from ._version import __version__
 
from .util import load_entry_points
 
from .console import Progress
 
from rattail.config import RattailConfig
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ArgumentParser(argparse.ArgumentParser):
 
    """
 
    Custom argument parser.
 

	
 
    This overrides some of the parsing logic which is specific to the primary
 
    command object.
 
    """
 

	
 
    def parse_args(self, args=None, namespace=None):
 
        args, argv = self.parse_known_args(args, namespace)
 
        args.argv = argv
 
        return args
 

	
 

	
 
def date_argument(string):
 
    """
 
    Validate and coerce a date argument.
 

	
 
    This function is designed be used as the ``type`` parameter when calling
 
    ``ArgumentParser.add_argument()``, e.g.::
 

	
 
       parser = ArgumentParser()
 
       parser.add_argument('--date', type=date_argument)
 
    """
 
    try:
 
        date = datetime.datetime.strptime(string, '%Y-%m-%d').date()
 
    except ValueError:
 
        raise argparse.ArgumentTypeError("Date must be in YYYY-MM-DD format")
 
    return date
 

	
 

	
 
class Command(object):
 
    """
 
    The primary command for the application.
 

	
 
    This effectively *is* the ``rattail`` console application.  It mostly
 
    provides the structure for subcommands, which really do all the work.
 

	
 
    This command is designed to be subclassed, should your application need
 
    similar functionality.
 
    """
 
    name = 'rattail'
 
    version = __version__
 
    description = "Retail Software Framework"
 
    long_description = """
 
Rattail is a retail software framework.
 

	
 
Copyright (c) 2010-2012 Lance Edgar <lance@edbob.org>
 
Copyright (c) 2010-2015 Lance Edgar <lance@edbob.org>
 

	
 
This program comes with ABSOLUTELY NO WARRANTY.  This is free software,
 
and you are welcome to redistribute it under certain conditions.
 
See the file COPYING.txt for more information.
 
"""
 

	
 
    stdout = sys.stdout
 
    stderr = sys.stderr
 

	
 
    def __init__(self):
 
        self.subcommands = load_entry_points('{0}.commands'.format(self.name))
 

	
 
    def __unicode__(self):
 
        return unicode(self.name)
 

	
 
    @property
 
    def db_config_section(self):
 
        """
 
        Name of section in config file which should have database connection
 
        info.  This defaults to ``'rattail.db'`` but may be overridden so the
 
        command framework can sit in front of a non-Rattail database if needed.
 

	
 
        This is used to auto-configure a "default" database engine for the app,
 
        when any command is invoked.
 
        """
 
        return 'rattail.db'
 

	
 
    @property
 
    def db_session_factory(self):
 
        """
 
        Reference to the "primary" ``Session`` class, which will be configured
 
        automatically during app startup.  Defaults to :class:`rattail.db.Session`.
 
        """
 
        from rattail.db import Session
 
        return Session
 

	
 
    def iter_subcommands(self):
 
        """
 
        Iterate over the subcommands.
 

	
 
        This is a generator which yields each associated :class:`Subcommand`
 
        class sorted by :attr:`Subcommand.name`.
 
        """
 
        for name in sorted(self.subcommands):
 
            yield self.subcommands[name]
 

	
 
    def print_help(self):
 
        """
 
        Print help text for the primary command.
 

	
 
        The output will include a list of available subcommands.
 
        """
 
        self.stdout.write("""{0}
 

	
 
Usage: {1} [options] <command> [command-options]
 

	
 
Options:
 
  -c PATH, --config=PATH
 
                    Config path (may be specified more than once)
 
  -n, --no-init     Don't load config before executing command
 
  -d, --debug       Increase logging level to DEBUG
 
  -P, --progress    Show progress indicators (where relevant)
 
  -v, --verbose     Increase logging level to INFO
 
  -V, --version     Display program version and exit
 

	
 
Commands:\n""".format(self.description, self.name))
 

	
 
        for cmd in self.iter_subcommands():
 
            self.stdout.write("  {0:<16s}  {1}\n".format(cmd.name, cmd.description))
 

	
 
        self.stdout.write("\nTry '{0} help <command>' for more help.\n".format(self.name))
 

	
 
    def run(self, *args):
 
        """
 
        Parse command line arguments and execute appropriate subcommand.
 
        """
 
        parser = ArgumentParser(
 
            prog=self.name,
 
            description=self.description,
 
            add_help=False,
 
            )
 

	
 
        parser.add_argument('-c', '--config', action='append', dest='config_paths',
 
                            metavar='PATH')
 
        parser.add_argument('-d', '--debug', action='store_true', dest='debug')
 
        parser.add_argument('-n', '--no-init', action='store_true', default=False)
 
        parser.add_argument('-P', '--progress', action='store_true', default=False)
 
        parser.add_argument('--stdout', metavar='PATH', type=argparse.FileType('w'),
 
                            help="Optional path to which STDOUT should be effectively redirected.")
 
        parser.add_argument('--stderr', metavar='PATH', type=argparse.FileType('w'),
 
                            help="Optional path to which STDERR should be effectively redirected.")
 
        parser.add_argument('-v', '--verbose', action='store_true', dest='verbose')
 
        parser.add_argument('-V', '--version', action='version',
 
                            version="%(prog)s {0}".format(self.version))
 
        parser.add_argument('command', nargs='*')
 

	
 
        # Parse args and determine subcommand.
 
        args = parser.parse_args(list(args))
 
        if not args or not args.command:
 
            self.print_help()
 
            return
 

	
 
        # Show (sub)command help if so instructed, or unknown subcommand.
 
        cmd = args.command.pop(0)
 
        if cmd == 'help':
 
            if len(args.command) != 1:
 
                self.print_help()
 
                return
 
            cmd = args.command[0]
 
            if cmd not in self.subcommands:
 
                self.print_help()
 
                return
 
            cmd = self.subcommands[cmd](parent=self)
 
            cmd.parser.print_help()
 
            return
 
        elif cmd not in self.subcommands:
 
            self.print_help()
 
            return
 

	
 
        # Okay, we should be done needing to print help messages.  Now it's
 
        # safe to redirect STDOUT/STDERR, if necessary.
 
        if args.stdout:
 
            self.stdout = args.stdout
 
        if args.stderr:
 
            self.stderr = args.stderr
 

	
 
        # Basic logging should be established before init()ing.
 

	
 
        # Use root logger if setting logging flags.
 
        log = logging.getLogger()
 

	
 
        # TODO: Figure out the basic logging pattern we're after here.
 
        # edbob.basic_logging()
 
        logging.basicConfig()
 

	
 
        if args.verbose:
 
            log.setLevel(logging.INFO)
 
        if args.debug:
 
            log.setLevel(logging.DEBUG)
 

	
 
        # Initialize everything...
 
        config = None
 
        if not args.no_init:
 
            edbob.init(self.name, *(args.config_paths or []))
 
            config = RattailConfig(edbob.config)
 

	
 
            # Command line logging flags should override config.
 
            if args.verbose:
 
                log.setLevel(logging.INFO)
 
            if args.debug:
 
                log.setLevel(logging.DEBUG)
 

	
 
            # Configure the default database engine.
 
            try:
 
                from rattail.db.util import configure_session_factory
 
            except ImportError:
 
                pass            # assume no sqlalchemy
 
            else:
 
                configure_session_factory(config, section=self.db_config_section,
 
                                          session_factory=self.db_session_factory)
 

	
 
                # Maybe configure Continuum versioning.
 
                if (self.db_config_section == 'rattail.db'
 
                    and config.getboolean('rattail.db', 'versioning.enabled')):
 
                    from rattail.db.continuum  import configure_versioning
 
                    configure_versioning(config)
 

	
 
        # And finally, do something of real value...
 
        cmd = self.subcommands[cmd](self)
 
        cmd.config = config
 
        cmd.show_progress = args.progress
 
        cmd.progress = Progress if args.progress else None
 
        cmd._run(*(args.command + args.argv))
 

	
 

	
 
class Subcommand(object):
 
    """
 
    Base class for application subcommands.
 
    """
 
    name = 'UNDEFINED'
 
    description = 'UNDEFINED'
 

	
 
    def __init__(self, parent=None, show_progress=None):
 
        self.parent = parent
 
        self.stdout = getattr(parent, 'stdout', sys.stdout)
 
        self.stderr = getattr(parent, 'stderr', sys.stderr)
 
        self.show_progress = show_progress
 
        self.progress = Progress if show_progress else None
 
        self.parser = argparse.ArgumentParser(
 
            prog='{0} {1}'.format(getattr(self.parent, 'name', 'UNDEFINED'), self.name),
 
            description=self.description)
 
        self.add_parser_args(self.parser)
 

	
 
    def __repr__(self):
 
        return "Subcommand(name={0})".format(repr(self.name))
 

	
 
    def add_parser_args(self, parser):
 
        """
 
        Configure additional arguments for the subcommand argument parser.
 
        """
 
        pass
 
            
 
    def _run(self, *args):
 
        args = self.parser.parse_args(list(args))
 
        return self.run(args)
 

	
 
    def run(self, args):
 
        """
 
        Run the subcommand logic.
 
        """
 
        raise NotImplementedError
 

	
 

	
 
class AddUser(Subcommand):
 
    """
 
    Adds a user to the database.
 
    """
 

	
 
    name = 'adduser'
 
    description = "Add a user to the database."
 

	
 
    def add_parser_args(self, parser):
 
        parser.add_argument('username',
 
                            help="Username for the new account.")
 
        parser.add_argument('-A', '--administrator',
 
                            action='store_true',
 
                            help="Add the new user to the Administrator role.")
 

	
 
    def run(self, args):
 
        from rattail.db import Session
 
        from rattail.db import model
 
        from rattail.db.auth import set_user_password, administrator_role
 

	
 
        session = Session()
 

	
 
        if session.query(model.User).filter_by(username=args.username).count():
 
            session.close()
 
            self.stderr.write("User '{0}' already exists.\n".format(args.username))
 
            return
 

	
 
        passwd = ''
 
        while not passwd:
 
            try:
 
                passwd = getpass("Enter a password for user '{0}': ".format(args.username))
 
            except KeyboardInterrupt:
 
                self.stderr.write("\nOperation was canceled.\n")
 
                return
 

	
tests/test_commands.py
Show inline comments
 
@@ -41,210 +41,192 @@ class TestDateArgument(TestCase):
 
    def test_valid_date_string_returns_date_object(self):
 
        date = commands.date_argument('2014-01-01')
 
        self.assertEqual(date, datetime.date(2014, 1, 1))
 

	
 
    def test_invalid_date_string_raises_error(self):
 
        self.assertRaises(argparse.ArgumentTypeError, commands.date_argument, 'invalid-date')
 

	
 

	
 
class TestCommand(TestCase):
 

	
 
    def test_initial_subcommands_are_sane(self):
 
        command = commands.Command()
 
        self.assertTrue('filemon' in command.subcommands)
 

	
 
    def test_unicode(self):
 
        command = commands.Command()
 
        command.name = 'some-app'
 
        self.assertEqual(unicode(command), u'some-app')
 
        
 
    def test_iter_subcommands_includes_expected_item(self):
 
        command = commands.Command()
 
        found = False
 
        for subcommand in command.iter_subcommands():
 
            if subcommand.name == 'filemon':
 
                found = True
 
                break
 
        self.assertTrue(found)
 

	
 
    def test_print_help(self):
 
        command = commands.Command()
 
        stdout = StringIO()
 
        command.stdout = stdout
 
        command.print_help()
 
        output = stdout.getvalue()
 
        stdout.close()
 
        self.assertTrue('Usage:' in output)
 
        self.assertTrue('Options:' in output)
 

	
 
    def test_run_with_no_args_prints_help(self):
 
        command = commands.Command()
 
        with patch.object(command, 'print_help') as print_help:
 
            command.run()
 
            print_help.assert_called_once_with()
 

	
 
    def test_run_with_single_help_arg_prints_help(self):
 
        command = commands.Command()
 
        with patch.object(command, 'print_help') as print_help:
 
            command.run('help')
 
            print_help.assert_called_once_with()
 

	
 
    def test_run_with_help_and_unknown_subcommand_args_prints_help(self):
 
        command = commands.Command()
 
        with patch.object(command, 'print_help') as print_help:
 
            command.run('help', 'invalid-subcommand-name')
 
            print_help.assert_called_once_with()
 

	
 
    def test_run_with_help_and_subcommand_args_prints_subcommand_help(self):
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Mock()
 
        command.run('help', 'fake')
 
        fake.return_value.parser.print_help.assert_called_once_with()
 

	
 
    def test_run_with_unknown_subcommand_arg_prints_help(self):
 
        command = commands.Command()
 
        with patch.object(command, 'print_help') as print_help:
 
            command.run('invalid-command-name')
 
            print_help.assert_called_once_with()
 

	
 
    def test_stdout_may_be_redirected(self):
 
        class Fake(commands.Subcommand):
 
            def run(self, args):
 
                self.stdout.write("standard output stuff")
 
                self.stdout.flush()
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Fake
 
        tmp = TempIO()
 
        config_path = tmp.putfile('test.ini', '')
 
        out_path = tmp.putfile('out.txt', '')
 
        command.run('fake', '--config', config_path, '--stdout', out_path)
 
        with open(out_path) as f:
 
            self.assertEqual(f.read(), "standard output stuff")
 

	
 
    def test_stderr_may_be_redirected(self):
 
        class Fake(commands.Subcommand):
 
            def run(self, args):
 
                self.stderr.write("standard error stuff")
 
                self.stderr.flush()
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Fake
 
        tmp = TempIO()
 
        config_path = tmp.putfile('test.ini', '')
 
        err_path = tmp.putfile('err.txt', '')
 
        command.run('fake', '--config', config_path, '--stderr', err_path)
 
        with open(err_path) as f:
 
            self.assertEqual(f.read(), "standard error stuff")
 

	
 
    def test_verbose_flag_sets_root_logging_level_to_info(self):
 
        self.assertEqual(logging.getLogger().getEffectiveLevel(), logging.NOTSET)
 
        tmp = TempIO()
 
        config_path = tmp.putfile('test.ini', '')
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Mock()
 
        command.run('fake', '--config', config_path, '--verbose')
 
        self.assertEqual(logging.getLogger().getEffectiveLevel(), logging.INFO)
 

	
 
    def test_debug_flag_sets_root_logging_level_to_debug(self):
 
        self.assertEqual(logging.getLogger().getEffectiveLevel(), logging.NOTSET)
 
        tmp = TempIO()
 
        config_path = tmp.putfile('test.ini', '')
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Mock()
 
        command.run('fake', '--config', config_path, '--debug')
 
        self.assertEqual(logging.getLogger().getEffectiveLevel(), logging.DEBUG)
 

	
 
    def test_noinit_flag_means_no_config(self):
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Mock()
 
        command.run('fake', '--no-init')
 
        self.assertTrue(fake.return_value.config is None)
 

	
 

	
 
class TestSubcommand(TestCase):
 

	
 
    def test_repr(self):
 
        command = commands.Command()
 
        subcommand = commands.Subcommand(command)
 
        subcommand.name = 'fake-command'
 
        self.assertEqual(repr(subcommand), "Subcommand(name=u'fake-command')")
 

	
 
    def test_add_parser_args_does_nothing(self):
 
        command = commands.Command()
 
        subcommand = commands.Subcommand(command)
 
        # Not sure this is really the way to test this, but...
 
        self.assertEqual(len(subcommand.parser._action_groups[0]._actions), 1)
 
        subcommand.add_parser_args(subcommand.parser)
 
        self.assertEqual(len(subcommand.parser._action_groups[0]._actions), 1)
 

	
 
    def test_run_not_implemented(self):
 
        command = commands.Command()
 
        subcommand = commands.Subcommand(command)
 
        args = subcommand.parser.parse_args([])
 
        self.assertRaises(NotImplementedError, subcommand.run, args)
 

	
 

	
 
class TestAddUser(DataTestCase):
 
    
 
    def setUp(self):
 
        super(TestAddUser, self).setUp()
 
        self.tmp = TempIO()
 
        self.stdout_path = self.tmp.putfile('stdout.txt', '')
 
        self.stderr_path = self.tmp.putfile('stderr.txt', '')
 

	
 
    def test_no_user_created_if_username_already_exists(self):
 
        self.session.add(model.User(username='fred'))
 
        self.session.commit()
 
        self.assertEqual(self.session.query(model.User).count(), 1)
 
        commands.main('adduser', '--no-init', '--stderr', self.stderr_path, 'fred')
 
        with open(self.stderr_path) as f:
 
            self.assertEqual(f.read(), "User 'fred' already exists.\n")
 
        self.assertEqual(self.session.query(model.User).count(), 1)
 

	
 
    def test_no_user_created_if_password_prompt_is_canceled(self):
 
        self.assertEqual(self.session.query(model.User).count(), 0)
 
        with patch('rattail.commands.getpass') as getpass:
 
            getpass.side_effect = KeyboardInterrupt
 
            commands.main('adduser', '--no-init', '--stderr', self.stderr_path, 'fred')
 
        with open(self.stderr_path) as f:
 
            self.assertEqual(f.read(), "\nOperation was canceled.\n")
 
        self.assertEqual(self.session.query(model.User).count(), 0)
 

	
 
    def test_normal_user_created_with_correct_password_but_no_admin_role(self):
 
        self.assertEqual(self.session.query(model.User).count(), 0)
 
        with patch('rattail.commands.getpass') as getpass:
 
            getpass.return_value = 'fredpass'
 
            commands.main('adduser', '--no-init', '--stdout', self.stdout_path, 'fred')
 
        with open(self.stdout_path) as f:
 
            self.assertEqual(f.read(), "Created user: fred\n")
 
        fred = self.session.query(model.User).one()
 
        self.assertEqual(fred.username, 'fred')
 
        self.assertEqual(len(fred.roles), 0)
 
        user = authenticate_user(self.session, 'fred', 'fredpass')
 
        self.assertTrue(user is fred)
 

	
 
    def test_admin_user_created_with_administrator_role(self):
 
        self.assertEqual(self.session.query(model.User).count(), 0)
 
        with patch('rattail.commands.getpass') as getpass:
 
            getpass.return_value = 'fredpass'
 
            commands.main('adduser', '--no-init', '--stdout', self.stdout_path, 'fred', '--administrator')
 
        fred = self.session.query(model.User).one()
 
        self.assertEqual(len(fred.roles), 1)
 
        self.assertEqual(fred.roles[0].name, 'Administrator')
 

	
 

	
 
class TestDatabaseSync(TestCase):
 

	
 
    @patch('rattail.db.sync.linux.start_daemon')
 
    def test_start_daemon_with_default_args(self, start_daemon):
 
        commands.main('dbsync', '--no-init', 'start')
 
        start_daemon.assert_called_once_with(None, None, True)
 

	
 
    @patch('rattail.db.sync.linux.start_daemon')
 
    def test_start_daemon_with_explicit_args(self, start_daemon):
 
        tmp = TempIO()
 
        pid_path = tmp.putfile('test.pid', '')
 
        commands.main('dbsync', '--no-init', '--pidfile', pid_path, '--do-not-daemonize', 'start')
 
        start_daemon.assert_called_once_with(None, pid_path, False)
 

	
 
    @patch('rattail.db.sync.linux.start_daemon')
 
    def test_keyboard_interrupt_raises_error_when_daemonized(self, start_daemon):
 
        start_daemon.side_effect = KeyboardInterrupt
0 comments (0 inline, 0 general)