Changeset - 9694902d4cdd
[Not reviewed]
0 2 0
Lance Edgar - 9 years ago 2015-08-16 19:36:40
ledgar@sacfoodcoop.com
Remove some unused/unwanted command line arguments.

These weren't bad ideas, but hadn't really been implemented. Easier just
to get rid of them for now.
2 files changed with 1 insertions and 32 deletions:
0 comments (0 inline, 0 general)
rattail/commands.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
################################################################################
 
#
 
#  Rattail -- Retail Software Framework
 
#  Copyright © 2010-2015 Lance Edgar
 
#
 
#  This file is part of Rattail.
 
#
 
#  Rattail is free software: you can redistribute it and/or modify it under the
 
#  terms of the GNU Affero General Public License as published by the Free
 
#  Software Foundation, either version 3 of the License, or (at your option)
 
#  any later version.
 
#
 
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
 
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
#  FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
 
#  more details.
 
#
 
#  You should have received a copy of the GNU Affero General Public License
 
#  along with Rattail.  If not, see <http://www.gnu.org/licenses/>.
 
#
 
################################################################################
 
"""
 
Console Commands
 
"""
 

	
 
from __future__ import absolute_import
 
from __future__ import unicode_literals
 

	
 
import os
 
import sys
 
import platform
 
import argparse
 
import datetime
 
import socket
 
import shutil
 
import warnings
 
import logging
 
from getpass import getpass
 

	
 
import edbob
 

	
 
from ._version import __version__
 
from .util import load_entry_points
 
from .console import Progress
 
from rattail.config import RattailConfig
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ArgumentParser(argparse.ArgumentParser):
 
    """
 
    Custom argument parser.
 

	
 
    This overrides some of the parsing logic which is specific to the primary
 
    command object.
 
    """
 

	
 
    def parse_args(self, args=None, namespace=None):
 
        args, argv = self.parse_known_args(args, namespace)
 
        args.argv = argv
 
        return args
 

	
 

	
 
def date_argument(string):
 
    """
 
    Validate and coerce a date argument.
 

	
 
    This function is designed be used as the ``type`` parameter when calling
 
    ``ArgumentParser.add_argument()``, e.g.::
 

	
 
       parser = ArgumentParser()
 
       parser.add_argument('--date', type=date_argument)
 
    """
 
    try:
 
        date = datetime.datetime.strptime(string, '%Y-%m-%d').date()
 
    except ValueError:
 
        raise argparse.ArgumentTypeError("Date must be in YYYY-MM-DD format")
 
    return date
 

	
 

	
 
class Command(object):
 
    """
 
    The primary command for the application.
 

	
 
    This effectively *is* the ``rattail`` console application.  It mostly
 
    provides the structure for subcommands, which really do all the work.
 

	
 
    This command is designed to be subclassed, should your application need
 
    similar functionality.
 
    """
 
    name = 'rattail'
 
    version = __version__
 
    description = "Retail Software Framework"
 
    long_description = """
 
Rattail is a retail software framework.
 

	
 
Copyright (c) 2010-2012 Lance Edgar <lance@edbob.org>
 
Copyright (c) 2010-2015 Lance Edgar <lance@edbob.org>
 

	
 
This program comes with ABSOLUTELY NO WARRANTY.  This is free software,
 
and you are welcome to redistribute it under certain conditions.
 
See the file COPYING.txt for more information.
 
"""
 

	
 
    stdout = sys.stdout
 
    stderr = sys.stderr
 

	
 
    def __init__(self):
 
        self.subcommands = load_entry_points('{0}.commands'.format(self.name))
 

	
 
    def __unicode__(self):
 
        return unicode(self.name)
 

	
 
    @property
 
    def db_config_section(self):
 
        """
 
        Name of section in config file which should have database connection
 
        info.  This defaults to ``'rattail.db'`` but may be overridden so the
 
        command framework can sit in front of a non-Rattail database if needed.
 

	
 
        This is used to auto-configure a "default" database engine for the app,
 
        when any command is invoked.
 
        """
 
        return 'rattail.db'
 

	
 
    @property
 
    def db_session_factory(self):
 
        """
 
        Reference to the "primary" ``Session`` class, which will be configured
 
        automatically during app startup.  Defaults to :class:`rattail.db.Session`.
 
        """
 
        from rattail.db import Session
 
        return Session
 

	
 
    def iter_subcommands(self):
 
        """
 
        Iterate over the subcommands.
 

	
 
        This is a generator which yields each associated :class:`Subcommand`
 
        class sorted by :attr:`Subcommand.name`.
 
        """
 
        for name in sorted(self.subcommands):
 
            yield self.subcommands[name]
 

	
 
    def print_help(self):
 
        """
 
        Print help text for the primary command.
 

	
 
        The output will include a list of available subcommands.
 
        """
 
        self.stdout.write("""{0}
 

	
 
Usage: {1} [options] <command> [command-options]
 

	
 
Options:
 
  -c PATH, --config=PATH
 
                    Config path (may be specified more than once)
 
  -n, --no-init     Don't load config before executing command
 
  -d, --debug       Increase logging level to DEBUG
 
  -P, --progress    Show progress indicators (where relevant)
 
  -v, --verbose     Increase logging level to INFO
 
  -V, --version     Display program version and exit
 

	
 
Commands:\n""".format(self.description, self.name))
 

	
 
        for cmd in self.iter_subcommands():
 
            self.stdout.write("  {0:<16s}  {1}\n".format(cmd.name, cmd.description))
 

	
 
        self.stdout.write("\nTry '{0} help <command>' for more help.\n".format(self.name))
 

	
 
    def run(self, *args):
 
        """
 
        Parse command line arguments and execute appropriate subcommand.
 
        """
 
        parser = ArgumentParser(
 
            prog=self.name,
 
            description=self.description,
 
            add_help=False,
 
            )
 

	
 
        parser.add_argument('-c', '--config', action='append', dest='config_paths',
 
                            metavar='PATH')
 
        parser.add_argument('-d', '--debug', action='store_true', dest='debug')
 
        parser.add_argument('-n', '--no-init', action='store_true', default=False)
 
        parser.add_argument('-P', '--progress', action='store_true', default=False)
 
        parser.add_argument('--stdout', metavar='PATH', type=argparse.FileType('w'),
 
                            help="Optional path to which STDOUT should be effectively redirected.")
 
        parser.add_argument('--stderr', metavar='PATH', type=argparse.FileType('w'),
 
                            help="Optional path to which STDERR should be effectively redirected.")
 
        parser.add_argument('-v', '--verbose', action='store_true', dest='verbose')
 
        parser.add_argument('-V', '--version', action='version',
 
                            version="%(prog)s {0}".format(self.version))
 
        parser.add_argument('command', nargs='*')
 

	
 
        # Parse args and determine subcommand.
 
        args = parser.parse_args(list(args))
 
        if not args or not args.command:
 
            self.print_help()
 
            return
 

	
 
        # Show (sub)command help if so instructed, or unknown subcommand.
 
        cmd = args.command.pop(0)
 
        if cmd == 'help':
 
            if len(args.command) != 1:
 
                self.print_help()
 
                return
 
            cmd = args.command[0]
 
            if cmd not in self.subcommands:
 
                self.print_help()
 
                return
 
            cmd = self.subcommands[cmd](parent=self)
 
            cmd.parser.print_help()
 
            return
 
        elif cmd not in self.subcommands:
 
            self.print_help()
 
            return
 

	
 
        # Okay, we should be done needing to print help messages.  Now it's
 
        # safe to redirect STDOUT/STDERR, if necessary.
 
        if args.stdout:
 
            self.stdout = args.stdout
 
        if args.stderr:
 
            self.stderr = args.stderr
 

	
 
        # Basic logging should be established before init()ing.
 

	
 
        # Use root logger if setting logging flags.
 
        log = logging.getLogger()
 

	
 
        # TODO: Figure out the basic logging pattern we're after here.
 
        # edbob.basic_logging()
 
        logging.basicConfig()
 

	
 
        if args.verbose:
 
            log.setLevel(logging.INFO)
 
        if args.debug:
 
            log.setLevel(logging.DEBUG)
 

	
 
        # Initialize everything...
 
        config = None
 
        if not args.no_init:
 
            edbob.init(self.name, *(args.config_paths or []))
 
            config = RattailConfig(edbob.config)
 

	
 
            # Command line logging flags should override config.
 
            if args.verbose:
 
                log.setLevel(logging.INFO)
 
            if args.debug:
 
                log.setLevel(logging.DEBUG)
 

	
 
            # Configure the default database engine.
 
            try:
 
                from rattail.db.util import configure_session_factory
 
            except ImportError:
 
                pass            # assume no sqlalchemy
 
            else:
 
                configure_session_factory(config, section=self.db_config_section,
 
                                          session_factory=self.db_session_factory)
 

	
 
                # Maybe configure Continuum versioning.
 
                if (self.db_config_section == 'rattail.db'
 
                    and config.getboolean('rattail.db', 'versioning.enabled')):
 
                    from rattail.db.continuum  import configure_versioning
 
                    configure_versioning(config)
 

	
 
        # And finally, do something of real value...
 
        cmd = self.subcommands[cmd](self)
 
        cmd.config = config
 
        cmd.show_progress = args.progress
 
        cmd.progress = Progress if args.progress else None
 
        cmd._run(*(args.command + args.argv))
 

	
 

	
 
class Subcommand(object):
 
    """
 
    Base class for application subcommands.
 
    """
 
    name = 'UNDEFINED'
 
    description = 'UNDEFINED'
 

	
 
    def __init__(self, parent=None, show_progress=None):
 
        self.parent = parent
 
        self.stdout = getattr(parent, 'stdout', sys.stdout)
 
        self.stderr = getattr(parent, 'stderr', sys.stderr)
 
        self.show_progress = show_progress
 
        self.progress = Progress if show_progress else None
 
        self.parser = argparse.ArgumentParser(
 
            prog='{0} {1}'.format(getattr(self.parent, 'name', 'UNDEFINED'), self.name),
 
            description=self.description)
 
        self.add_parser_args(self.parser)
 

	
 
    def __repr__(self):
 
        return "Subcommand(name={0})".format(repr(self.name))
 

	
 
    def add_parser_args(self, parser):
 
        """
 
        Configure additional arguments for the subcommand argument parser.
 
        """
 
        pass
 
            
 
    def _run(self, *args):
 
        args = self.parser.parse_args(list(args))
 
        return self.run(args)
 

	
 
    def run(self, args):
 
        """
 
        Run the subcommand logic.
 
        """
 
        raise NotImplementedError
 

	
 

	
 
class AddUser(Subcommand):
 
    """
 
    Adds a user to the database.
 
    """
 

	
 
    name = 'adduser'
 
    description = "Add a user to the database."
 

	
 
    def add_parser_args(self, parser):
 
        parser.add_argument('username',
 
                            help="Username for the new account.")
 
        parser.add_argument('-A', '--administrator',
 
                            action='store_true',
 
                            help="Add the new user to the Administrator role.")
 

	
 
    def run(self, args):
 
        from rattail.db import Session
 
        from rattail.db import model
 
        from rattail.db.auth import set_user_password, administrator_role
 

	
 
        session = Session()
 

	
 
        if session.query(model.User).filter_by(username=args.username).count():
 
            session.close()
 
            self.stderr.write("User '{0}' already exists.\n".format(args.username))
 
            return
 

	
 
        passwd = ''
 
        while not passwd:
 
            try:
 
                passwd = getpass("Enter a password for user '{0}': ".format(args.username))
 
            except KeyboardInterrupt:
 
                self.stderr.write("\nOperation was canceled.\n")
 
                return
 

	
 
        user = model.User(username=args.username)
 
        set_user_password(user, passwd)
 
        if args.administrator:
 
            user.roles.append(administrator_role(session))
 
        session.add(user)
 
        session.commit()
 
        session.close()
 
        self.stdout.write("Created user: {0}\n".format(args.username))
 

	
 

	
 
class EmailBouncer(Subcommand):
 
    """
 
    Interacts with the email bouncer daemon.  This command expects a
 
    subcommand; one of the following:
 

	
 
    * ``rattail bouncer start``
 
    * ``rattail bouncer stop``
 
    """
 
    name = 'bouncer'
 
    description = "Manage the email bouncer daemon"
 

	
 
    def add_parser_args(self, parser):
 
        subparsers = parser.add_subparsers(title='subcommands')
 

	
 
        start = subparsers.add_parser('start', help="Start service")
 
        start.set_defaults(subcommand='start')
 
        stop = subparsers.add_parser('stop', help="Stop service")
 
        stop.set_defaults(subcommand='stop')
 

	
 
        parser.add_argument('-p', '--pidfile', metavar='PATH', default='/var/run/rattail/bouncer.pid',
 
                            help="Path to PID file.")
 
        parser.add_argument('-D', '--do-not-daemonize',
 
                            action='store_false', dest='daemonize', default=True,
 
                            help="Do not daemonize when starting.")
 

	
 
    def run(self, args):
 
        from rattail.bouncer.daemon import BouncerDaemon
 

	
 
        daemon = BouncerDaemon(args.pidfile, config=self.config)
 
        if args.subcommand == 'stop':
 
            daemon.stop()
 
        else: # start
 
            try:
 
                daemon.start(daemonize=args.daemonize)
 
            except KeyboardInterrupt:
 
                if not args.daemonize:
 
                    self.stderr.write("Interrupted.\n")
 
                else:
 
                    raise
 

	
 

	
 
class DateOrganize(Subcommand):
 
    """
 
    Organize files in a given directory, according to date.
 
    """
 
    name = 'date-organize'
 
    description = "Organize files in a given directory according to date."
 

	
 
    def add_parser_args(self, parser):
 
        parser.add_argument('folder', metavar='PATH',
 
                            help="Path to directory containing files which are "
 
                            "to be organized by date.")
 

	
 
    def run(self, args):
 
        today = datetime.date.today()
 
        for filename in sorted(os.listdir(args.folder)):
 
            path = os.path.join(args.folder, filename)
 
            if os.path.isfile(path):
 
                mtime = datetime.datetime.fromtimestamp(os.path.getmtime(path))
 
                if mtime.date() < today:
 
                    datedir = mtime.strftime(os.sep.join(('%Y', '%m', '%d')))
 
                    datedir = os.path.join(args.folder, datedir)
 
                    if not os.path.exists(datedir):
 
                        os.makedirs(datedir)
 
                    shutil.move(path, datedir)
 

	
 

	
 
class DatabaseSyncCommand(Subcommand):
 
    """
 
    Controls the database synchronization service.
 
    """
 

	
 
    name = 'dbsync'
 
    description = "Manage the database synchronization service"
 

	
 
    def add_parser_args(self, parser):
 
        subparsers = parser.add_subparsers(title='subcommands')
 

	
 
        start = subparsers.add_parser('start', help="Start service")
 
        start.set_defaults(subcommand='start')
 
        stop = subparsers.add_parser('stop', help="Stop service")
 
        stop.set_defaults(subcommand='stop')
 

	
 
        if sys.platform == 'linux2':
 
            parser.add_argument('-p', '--pidfile',
 
                                help="Path to PID file", metavar='PATH')
 
            parser.add_argument('-D', '--do-not-daemonize',
 
                                action='store_false', dest='daemonize', default=True,
 
                                help="Do not daemonize when starting.")
 

	
 
    def run(self, args):
 
        from rattail.db.sync import linux as dbsync
 

	
 
        if args.subcommand == 'start':
 
            try:
 
                dbsync.start_daemon(self.config, args.pidfile, args.daemonize)
 
            except KeyboardInterrupt:
 
                if not args.daemonize:
 
                    self.stderr.write("Interrupted.\n")
 
                else:
 
                    raise
 

	
 
        elif args.subcommand == 'stop':
 
            dbsync.stop_daemon(self.config, args.pidfile)
 

	
 

	
 
class Dump(Subcommand):
 
    """
 
    Do a simple data dump.
 
    """
 

	
 
    name = 'dump'
 
    description = "Dump data to file."
 

	
 
    def add_parser_args(self, parser):
 
        parser.add_argument(
 
            '--output', '-o', metavar='FILE',
 
            help="Optional path to output file.  If none is specified, "
 
            "data will be written to standard output.")
 
        parser.add_argument(
 
            'model', help="Model whose data will be dumped.")
 

	
 
    def get_model(self):
 
        """
 
        Returns the module which contains all relevant data models.
 

	
 
        By default this returns ``rattail.db.model``, but this method may be
 
        overridden in derived commands to add support for extra data models.
 
        """
 
        from rattail.db import model
 
        return model
 

	
 
    def run(self, args):
 
        from rattail.db import Session
 
        from rattail.db.dump import dump_data
 

	
 
        model = self.get_model()
 
        if hasattr(model, args.model):
 
            cls = getattr(model, args.model)
 
        else:
 
            self.stderr.write("Unknown model: {0}\n".format(args.model))
 
            sys.exit(1)
 

	
 
        progress = None
 
        if self.show_progress: # pragma no cover
 
            progress = Progress
 

	
 
        if args.output:
 
            output = open(args.output, 'wb')
 
        else:
 
            output = self.stdout
 

	
 
        session = Session()
 
        dump_data(session, cls, output, progress=progress)
 
        session.close()
 

	
 
        if output is not self.stdout:
 
            output.close()
 

	
 

	
 
class FileMonitorCommand(Subcommand):
 
    """
 
    Interacts with the file monitor service; called as ``rattail filemon``.
 
    This command expects a subcommand; one of the following:
 

	
 
    * ``rattail filemon start``
 
    * ``rattail filemon stop``
 

	
 
    On Windows platforms, the following additional subcommands are available:
 

	
 
    * ``rattail filemon install``
 
    * ``rattail filemon uninstall`` (or ``rattail filemon remove``)
 

	
 
    .. note::
 
       The Windows Vista family of operating systems requires you to launch
 
       ``cmd.exe`` as an Administrator in order to have sufficient rights to
 
       run the above commands.
 

	
 
    .. See :doc:`howto.use_filemon` for more information.
 
    """
 

	
 
    name = 'filemon'
 
    description = "Manage the file monitor service"
 

	
 
    def add_parser_args(self, parser):
 
        subparsers = parser.add_subparsers(title='subcommands')
 

	
 
        start = subparsers.add_parser('start', help="Start service")
 
        start.set_defaults(subcommand='start')
 
        stop = subparsers.add_parser('stop', help="Stop service")
 
        stop.set_defaults(subcommand='stop')
 

	
 
        if sys.platform == 'linux2':
 
            parser.add_argument('-p', '--pidfile',
 
                                help="Path to PID file.", metavar='PATH')
 
            parser.add_argument('-D', '--do-not-daemonize',
 
                                action='store_false', dest='daemonize', default=True,
 
                                help="Do not daemonize when starting.")
 

	
 
        elif sys.platform == 'win32': # pragma no cover
 

	
 
            install = subparsers.add_parser('install', help="Install service")
 
            install.set_defaults(subcommand='install')
 
            install.add_argument('-a', '--auto-start', action='store_true',
 
                                 help="Configure service to start automatically.")
 
            install.add_argument('-U', '--username',
 
                                 help="User account under which the service should run.")
 

	
 
            remove = subparsers.add_parser('remove', help="Uninstall (remove) service")
 
            remove.set_defaults(subcommand='remove')
 

	
 
            uninstall = subparsers.add_parser('uninstall', help="Uninstall (remove) service")
 
            uninstall.set_defaults(subcommand='remove')
 

	
 
    def run(self, args):
 
        if sys.platform == 'linux2':
 
            from rattail.filemon import linux as filemon
 

	
 
            if args.subcommand == 'start':
 
                filemon.start_daemon(self.config, args.pidfile, args.daemonize)
 

	
 
            elif args.subcommand == 'stop':
 
                filemon.stop_daemon(self.config, args.pidfile)
 

	
 
        elif sys.platform == 'win32': # pragma no cover
 
            self.run_win32(args)
 

	
 
        else:
 
            self.stderr.write("File monitor is not supported on platform: {0}\n".format(sys.platform))
 
            sys.exit(1)
 

	
 
    def run_win32(self, args): # pragma no cover
 
        from rattail.win32 import require_elevation
 
        from rattail.win32 import service
 
        from rattail.win32 import users
 
        from rattail.filemon import win32 as filemon
 

	
 
        require_elevation()
 

	
 
        options = []
 
        if args.subcommand == 'install':
 

	
 
            username = args.username
 
            if username:
 
                if '\\' in username:
 
                    server, username = username.split('\\')
 
                else:
 
                    server = socket.gethostname()
 
                if not users.user_exists(username, server):
 
                    sys.stderr.write("User does not exist: {0}\\{1}\n".format(server, username))
 
                    sys.exit(1)
 

	
 
                password = ''
 
                while password == '':
 
                    password = getpass("Password for service user: ").strip()
 
                options.extend(['--username', r'{0}\{1}'.format(server, username)])
 
                options.extend(['--password', password])
 

	
 
            if args.auto_start:
 
                options.extend(['--startup', 'auto'])
 

	
 
        service.execute_service_command(filemon, args.subcommand, *options)
 

	
 
        # If installing with custom user, grant "logon as service" right.
 
        if args.subcommand == 'install' and args.username:
 
            users.allow_logon_as_service(username)
 

	
 
        # TODO: Figure out if the following is even required, or if instead we
 
        # should just be passing '--startup delayed' to begin with?
 

	
 
        # If installing auto-start service on Windows 7, we should update
 
        # its startup type to be "Automatic (Delayed Start)".
 
        # TODO: Improve this check to include Vista?
 
        if args.subcommand == 'install' and args.auto_start:
 
            if platform.release() == '7':
 
                name = filemon.RattailFileMonitor._svc_name_
 
                service.delayed_auto_start_service(name)
 

	
 

	
 
class ImportSubcommand(Subcommand):
 
    """
 
    Base class for subcommands which use the data importing system.
 
    """
 
    supports_versioning = True
 

	
 
    def add_parser_args(self, parser):
 
        if self.supports_versioning:
 
            parser.add_argument('--no-versioning', action='store_true',
 
                                help="Disables versioning during the import.  This is "
 
                                "intended to be useful e.g. during initial import, where "
 
                                "the process can be quite slow even without the overhead "
 
                                "of versioning.")
 
        parser.add_argument('--warnings', '-W', action='store_true',
 
                            help="Whether to log warnings if any data model "
 
                            "writes occur.  Intended to help stay in sync "
 
                            "with an external data source.")
 
        parser.add_argument('--max-updates', type=int,
 
                            help="Maximum number of record updates (or additions) which, if "
 
                            "reached, should cause the importer to stop early.  Note that the "
 
                            "updates which have completed will be committed unless a dry run "
 
                            "is in effect.")
 
        parser.add_argument('--dry-run', action='store_true',
 
                            help="Go through the motions and allow logging to occur, "
 
                            "but do not actually commit the transaction at the end.")
 
        parser.add_argument('models', nargs='*', metavar='MODEL',
 
                            help="One or more models to import.  If not specified, "
 
                            "then all supported models will be imported.")
 

	
 
    def run(self, args):
 
        log.info("begin {0} for data model(s): {1}".format(
 
                self.name, ', '.join(args.models or ["ALL"])))
 

	
 
        Session = self.parent.db_session_factory
 
        if self.supports_versioning:
 
            if args.no_versioning:
 
                from rattail.db.continuum  import disable_versioning
 
                disable_versioning()
 
            session = Session(continuum_user=self.continuum_user)
 
        else:
 
            session = Session()
 

	
 
        self.import_data(args, session)
 

	
 
        if args.dry_run:
 
            session.rollback()
 
            log.info("dry run, so transaction was rolled back")
 
        else:
 
            session.commit()
 
            log.info("transaction was committed")
 
        session.close()
 

	
 
    @property
 
    def continuum_user(self):
 
        """
 
        Info needed to assign the Continuum user for the database session.
 
        """
 

	
 
    def import_data(self, args, session):
 
        """
 
        Custom logic for importing data, to be implemented by subclass.
 
        """
 

	
 

	
 
class ImportCSV(ImportSubcommand):
 
    """
 
    Import data from a CSV file.
 
    """
 
    name = 'import-csv'
 
    description = "Import data from a CSV file."
 

	
 
    def add_parser_args(self, parser):
 
        super(ImportCSV, self).add_parser_args(parser)
 
        parser.add_argument('importer',
 
                            help="Spec string for importer class which should handle the import.")
 
        parser.add_argument('csv_path',
 
                            help="Path to the data file which will be imported.")
 

	
 
    def import_data(self, args, session):
 
        from rattail.db.importing.providers.csv import make_provider
 

	
 
        provider = make_provider(self.config, session, args.importer, data_path=args.csv_path)
 
        data = provider.get_data(progress=self.progress)
 
        affected = provider.importer.import_data(data, provider.supported_fields, 'uuid',
 
                                                 progress=self.progress)
 
        log.info("added or updated {0} {1} records".format(affected, provider.model_name))
 

	
 

	
 
class InitializeDatabase(Subcommand):
 
    """
 
    Creates the initial Rattail tables within a database.
 
    """
 

	
 
    name = 'initdb'
 
    description = "Create initial tables in a database."
 

	
 
    def add_parser_args(self, parser):
 
        parser.add_argument('url', metavar='URL',
 
                            help="Database engine URL")
 

	
 
    def run(self, args):
 
        from sqlalchemy import create_engine
 
        from alembic.util import obfuscate_url_pw
 
        from rattail.db import model
 
        
 
        engine = create_engine(args.url)
 
        model.Base.metadata.create_all(engine)
 
        print("Created initial tables for database:")
 
        print("  {0}".format(obfuscate_url_pw(engine.url)))
 

	
 

	
 
class LoadHostDataCommand(Subcommand):
 
    """
 
    Loads data from the Rattail host database, if one is configured.
 
    """
 

	
 
    name = 'load-host-data'
 
    description = "Load data from host database"
 

	
 
    def run(self, args):
 
        from .db import get_engines
 
        from .db import load
 

	
 
        engines = get_engines(self.config)
 
        if 'host' not in engines:
 
            sys.stderr.write("Host engine URL not configured.\n")
 
            sys.exit(1)
 

	
 
        proc = load.LoadProcessor(self.config)
 
        proc.load_all_data(engines['host'], Progress)
 

	
 

	
 
class MakeConfigCommand(Subcommand):
 
    """
 
    Creates a sample configuration file.
 
    """
 

	
 
    name = 'make-config'
 
    description = "Create a configuration file"
 

	
 
    def add_parser_args(self, parser):
 
        parser.add_argument('path', default='rattail.conf', metavar='PATH',
 
                            help="Path to the new file")
 
        parser.add_argument('-f', '--force', action='store_true',
 
                            help="Overwrite an existing file")
 

	
 

	
 
    def run(self, args):
 
        import os
 
        import os.path
 
        import shutil
 
        from rattail.files import resource_path
 

	
 
        dest = os.path.abspath(args.path)
 
        if os.path.exists(dest):
 
            if os.path.isdir(dest):
 
                sys.stderr.write("Path must include the filename; "
 
                                 "you gave a directory:\n  {0}\n".format(dest))
 
                sys.exit(1)
 
            if not args.force:
 
                sys.stderr.write("File already exists "
 
                                 "(use --force to overwrite):\n  "
 
                                 "{0}\n".format(dest))
 
                sys.exit(1)
 
            os.remove(dest)
 

	
 
        src = resource_path('rattail:data/rattail.conf.sample')
 
        shutil.copyfile(src, dest)
 

	
 

	
 
class MakeUserCommand(Subcommand):
 
    """
 
    Creates a system user for Rattail.
 
    """
 
    name = 'make-user'
 
    description = "Create a system user account for Rattail"
 

	
 
    def add_parser_args(self, parser):
 
        parser.add_argument('-U', '--username', metavar='USERNAME', default='rattail',
 
                            help="Username for the new user; defaults to 'rattail'.")
 
        parser.add_argument('--full-name', metavar='FULL_NAME',
 
                            help="Full (display) name for the new user.")
 
        parser.add_argument('--comment', metavar='COMMENT',
 
                            help="Comment string for the new user.")
 

	
 
    def run(self, args):
 
        if sys.platform != 'win32':
 
            sys.stderr.write("Sorry, only win32 platform is supported.\n")
 
            sys.exit(1)
 

	
 
        from rattail.win32 import users
 
        from rattail.win32 import require_elevation
 

	
 
        require_elevation()
 

	
 
        if users.user_exists(args.username):
 
            sys.stderr.write("User already exists: {0}\n".format(args.username))
 
            sys.exit(1)
 

	
 
        try:
 
            password = None
 
            while not password:
 
                password = getpass(b"Enter a password: ").strip()
 
        except KeyboardInterrupt:
 
            sys.stderr.write("Operation canceled by user.")
 
            sys.exit(2)
 

	
 
        users.create_user(args.username, password,
 
                          full_name=args.full_name, comment=args.comment)
 
        sys.stdout.write("Created user: {0}\n".format(args.username))
 

	
 

	
 
class PalmCommand(Subcommand):
 
    """
 
    Manages registration for the HotSync Manager conduit; called as::
 

	
 
       rattail palm
 
    """
 

	
 
    name = 'palm'
 
    description = "Manage the HotSync Manager conduit registration"
 

	
 
    def add_parser_args(self, parser):
 
        subparsers = parser.add_subparsers(title='subcommands')
 

	
 
        register = subparsers.add_parser('register', help="Register Rattail conduit")
 
        register.set_defaults(subcommand='register')
 

	
 
        unregister = subparsers.add_parser('unregister', help="Unregister Rattail conduit")
 
        unregister.set_defaults(subcommand='unregister')
 

	
 
    def run(self, args):
 
        from rattail import palm
 
        from rattail.win32 import require_elevation
 
        from rattail.exceptions import PalmError
 

	
 
        require_elevation()
 

	
 
        if args.subcommand == 'register':
 
            try:
 
                palm.register_conduit()
 
            except PalmError, error:
 
                sys.stderr.write(str(error) + '\n')
 

	
 
        elif args.subcommand == 'unregister':
 
            try:
 
                palm.unregister_conduit()
 
            except PalmError, error:
 
                sys.stderr.write(str(error) + '\n')
 
                
 

	
 
class PurgeBatchesCommand(Subcommand):
 
    """
 
    Purges stale batches from the database; called as:
 

	
 
    .. code-block:: sh
 

	
 
       rattail purge-batches
 
    """
 
    name = 'purge-batches'
 
    description = "Purge stale batches from the database"
 

	
 
    def add_parser_args(self, parser):
 
        parser.add_argument('-A', '--all', action='store_true',
 
                            help="Purge ALL batches regardless of purge date")
 
        parser.add_argument('--date', '-D', type=date_argument,
 
                            help="Optional effective date for the purge.  If "
 
                            "none is specified, the current date is assumed.")
 

	
 
    def run(self, args):
 
        from rattail.db import Session
 
        from rattail.db.util import obfuscate_url_pw
 
        from rattail.db.batches import util
 

	
 
        log.info("purging batches from database: {0}".format(obfuscate_url_pw(Session.kw['bind'].url)))
 
        normal = util.purge_batches(effective_date=args.date, purge_everything=args.all)
 
        orphaned = util.purge_orphaned_batches()
 
        log.info("purged {0} normal and {1} orphaned batches".format(normal, orphaned))
 

	
 

	
 
def main(*args):
 
    """
 
    The primary entry point for the Rattail command system.
 
    """
 
    # TODO: This should maybe be configurable or something?  For now I'm just tired of seeing it.
 
    warnings.filterwarnings(
 
        'ignore',
 
        r'^Python 2\.6 is no longer supported by the Python core team\, please upgrade your Python\.$',
 
        DeprecationWarning,
 
        r'^cryptography$',
 
    )
 

	
 
    if args:
 
        args = list(args)
 
    else:
 
        args = sys.argv[1:]
 

	
 
    cmd = Command()
 
    cmd.run(*args)
tests/test_commands.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from __future__ import unicode_literals
 

	
 
import csv
 
import datetime
 
import argparse
 
import logging
 
from unittest import TestCase
 
from cStringIO import StringIO
 

	
 
from mock import patch, Mock
 
from fixture import TempIO
 

	
 
from sqlalchemy import create_engine
 
from sqlalchemy import func
 

	
 
from . import DataTestCase
 
from rattail import commands
 
from rattail.db import Session
 
from rattail.db import model
 
from rattail.db.auth import authenticate_user
 

	
 

	
 
class TestArgumentParser(TestCase):
 

	
 
    def test_parse_args_preserves_extra_argv(self):
 
        parser = commands.ArgumentParser()
 
        parser.add_argument('--some-optional-arg')
 
        parser.add_argument('some_required_arg')
 
        args = parser.parse_args([
 
                '--some-optional-arg', 'optional-value', 'required-value',
 
                'some', 'extra', 'args'])
 
        self.assertEqual(args.some_required_arg, 'required-value')
 
        self.assertEqual(args.some_optional_arg, 'optional-value')
 
        self.assertEqual(args.argv, ['some', 'extra', 'args'])
 

	
 

	
 
class TestDateArgument(TestCase):
 

	
 
    def test_valid_date_string_returns_date_object(self):
 
        date = commands.date_argument('2014-01-01')
 
        self.assertEqual(date, datetime.date(2014, 1, 1))
 

	
 
    def test_invalid_date_string_raises_error(self):
 
        self.assertRaises(argparse.ArgumentTypeError, commands.date_argument, 'invalid-date')
 

	
 

	
 
class TestCommand(TestCase):
 

	
 
    def test_initial_subcommands_are_sane(self):
 
        command = commands.Command()
 
        self.assertTrue('filemon' in command.subcommands)
 

	
 
    def test_unicode(self):
 
        command = commands.Command()
 
        command.name = 'some-app'
 
        self.assertEqual(unicode(command), u'some-app')
 
        
 
    def test_iter_subcommands_includes_expected_item(self):
 
        command = commands.Command()
 
        found = False
 
        for subcommand in command.iter_subcommands():
 
            if subcommand.name == 'filemon':
 
                found = True
 
                break
 
        self.assertTrue(found)
 

	
 
    def test_print_help(self):
 
        command = commands.Command()
 
        stdout = StringIO()
 
        command.stdout = stdout
 
        command.print_help()
 
        output = stdout.getvalue()
 
        stdout.close()
 
        self.assertTrue('Usage:' in output)
 
        self.assertTrue('Options:' in output)
 

	
 
    def test_run_with_no_args_prints_help(self):
 
        command = commands.Command()
 
        with patch.object(command, 'print_help') as print_help:
 
            command.run()
 
            print_help.assert_called_once_with()
 

	
 
    def test_run_with_single_help_arg_prints_help(self):
 
        command = commands.Command()
 
        with patch.object(command, 'print_help') as print_help:
 
            command.run('help')
 
            print_help.assert_called_once_with()
 

	
 
    def test_run_with_help_and_unknown_subcommand_args_prints_help(self):
 
        command = commands.Command()
 
        with patch.object(command, 'print_help') as print_help:
 
            command.run('help', 'invalid-subcommand-name')
 
            print_help.assert_called_once_with()
 

	
 
    def test_run_with_help_and_subcommand_args_prints_subcommand_help(self):
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Mock()
 
        command.run('help', 'fake')
 
        fake.return_value.parser.print_help.assert_called_once_with()
 

	
 
    def test_run_with_unknown_subcommand_arg_prints_help(self):
 
        command = commands.Command()
 
        with patch.object(command, 'print_help') as print_help:
 
            command.run('invalid-command-name')
 
            print_help.assert_called_once_with()
 

	
 
    def test_stdout_may_be_redirected(self):
 
        class Fake(commands.Subcommand):
 
            def run(self, args):
 
                self.stdout.write("standard output stuff")
 
                self.stdout.flush()
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Fake
 
        tmp = TempIO()
 
        config_path = tmp.putfile('test.ini', '')
 
        out_path = tmp.putfile('out.txt', '')
 
        command.run('fake', '--config', config_path, '--stdout', out_path)
 
        with open(out_path) as f:
 
            self.assertEqual(f.read(), "standard output stuff")
 

	
 
    def test_stderr_may_be_redirected(self):
 
        class Fake(commands.Subcommand):
 
            def run(self, args):
 
                self.stderr.write("standard error stuff")
 
                self.stderr.flush()
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Fake
 
        tmp = TempIO()
 
        config_path = tmp.putfile('test.ini', '')
 
        err_path = tmp.putfile('err.txt', '')
 
        command.run('fake', '--config', config_path, '--stderr', err_path)
 
        with open(err_path) as f:
 
            self.assertEqual(f.read(), "standard error stuff")
 

	
 
    def test_verbose_flag_sets_root_logging_level_to_info(self):
 
        self.assertEqual(logging.getLogger().getEffectiveLevel(), logging.NOTSET)
 
        tmp = TempIO()
 
        config_path = tmp.putfile('test.ini', '')
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Mock()
 
        command.run('fake', '--config', config_path, '--verbose')
 
        self.assertEqual(logging.getLogger().getEffectiveLevel(), logging.INFO)
 

	
 
    def test_debug_flag_sets_root_logging_level_to_debug(self):
 
        self.assertEqual(logging.getLogger().getEffectiveLevel(), logging.NOTSET)
 
        tmp = TempIO()
 
        config_path = tmp.putfile('test.ini', '')
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Mock()
 
        command.run('fake', '--config', config_path, '--debug')
 
        self.assertEqual(logging.getLogger().getEffectiveLevel(), logging.DEBUG)
 

	
 
    def test_noinit_flag_means_no_config(self):
 
        command = commands.Command()
 
        fake = command.subcommands['fake'] = Mock()
 
        command.run('fake', '--no-init')
 
        self.assertTrue(fake.return_value.config is None)
 

	
 

	
 
class TestSubcommand(TestCase):
 

	
 
    def test_repr(self):
 
        command = commands.Command()
 
        subcommand = commands.Subcommand(command)
 
        subcommand.name = 'fake-command'
 
        self.assertEqual(repr(subcommand), "Subcommand(name=u'fake-command')")
 

	
 
    def test_add_parser_args_does_nothing(self):
 
        command = commands.Command()
 
        subcommand = commands.Subcommand(command)
 
        # Not sure this is really the way to test this, but...
 
        self.assertEqual(len(subcommand.parser._action_groups[0]._actions), 1)
 
        subcommand.add_parser_args(subcommand.parser)
 
        self.assertEqual(len(subcommand.parser._action_groups[0]._actions), 1)
 

	
 
    def test_run_not_implemented(self):
 
        command = commands.Command()
 
        subcommand = commands.Subcommand(command)
 
        args = subcommand.parser.parse_args([])
 
        self.assertRaises(NotImplementedError, subcommand.run, args)
 

	
 

	
 
class TestAddUser(DataTestCase):
 
    
 
    def setUp(self):
 
        super(TestAddUser, self).setUp()
 
        self.tmp = TempIO()
 
        self.stdout_path = self.tmp.putfile('stdout.txt', '')
 
        self.stderr_path = self.tmp.putfile('stderr.txt', '')
 

	
 
    def test_no_user_created_if_username_already_exists(self):
 
        self.session.add(model.User(username='fred'))
 
        self.session.commit()
 
        self.assertEqual(self.session.query(model.User).count(), 1)
 
        commands.main('adduser', '--no-init', '--stderr', self.stderr_path, 'fred')
 
        with open(self.stderr_path) as f:
 
            self.assertEqual(f.read(), "User 'fred' already exists.\n")
 
        self.assertEqual(self.session.query(model.User).count(), 1)
 

	
 
    def test_no_user_created_if_password_prompt_is_canceled(self):
 
        self.assertEqual(self.session.query(model.User).count(), 0)
 
        with patch('rattail.commands.getpass') as getpass:
 
            getpass.side_effect = KeyboardInterrupt
 
            commands.main('adduser', '--no-init', '--stderr', self.stderr_path, 'fred')
 
        with open(self.stderr_path) as f:
 
            self.assertEqual(f.read(), "\nOperation was canceled.\n")
 
        self.assertEqual(self.session.query(model.User).count(), 0)
 

	
 
    def test_normal_user_created_with_correct_password_but_no_admin_role(self):
 
        self.assertEqual(self.session.query(model.User).count(), 0)
 
        with patch('rattail.commands.getpass') as getpass:
 
            getpass.return_value = 'fredpass'
 
            commands.main('adduser', '--no-init', '--stdout', self.stdout_path, 'fred')
 
        with open(self.stdout_path) as f:
 
            self.assertEqual(f.read(), "Created user: fred\n")
 
        fred = self.session.query(model.User).one()
 
        self.assertEqual(fred.username, 'fred')
 
        self.assertEqual(len(fred.roles), 0)
 
        user = authenticate_user(self.session, 'fred', 'fredpass')
 
        self.assertTrue(user is fred)
 

	
 
    def test_admin_user_created_with_administrator_role(self):
 
        self.assertEqual(self.session.query(model.User).count(), 0)
 
        with patch('rattail.commands.getpass') as getpass:
 
            getpass.return_value = 'fredpass'
 
            commands.main('adduser', '--no-init', '--stdout', self.stdout_path, 'fred', '--administrator')
 
        fred = self.session.query(model.User).one()
 
        self.assertEqual(len(fred.roles), 1)
 
        self.assertEqual(fred.roles[0].name, 'Administrator')
 

	
 

	
 
class TestDatabaseSync(TestCase):
 

	
 
    @patch('rattail.db.sync.linux.start_daemon')
 
    def test_start_daemon_with_default_args(self, start_daemon):
 
        commands.main('dbsync', '--no-init', 'start')
 
        start_daemon.assert_called_once_with(None, None, True)
 

	
 
    @patch('rattail.db.sync.linux.start_daemon')
 
    def test_start_daemon_with_explicit_args(self, start_daemon):
 
        tmp = TempIO()
 
        pid_path = tmp.putfile('test.pid', '')
 
        commands.main('dbsync', '--no-init', '--pidfile', pid_path, '--do-not-daemonize', 'start')
 
        start_daemon.assert_called_once_with(None, pid_path, False)
 

	
 
    @patch('rattail.db.sync.linux.start_daemon')
 
    def test_keyboard_interrupt_raises_error_when_daemonized(self, start_daemon):
 
        start_daemon.side_effect = KeyboardInterrupt
 
        self.assertRaises(KeyboardInterrupt, commands.main, 'dbsync', '--no-init', 'start')
 

	
 
    @patch('rattail.db.sync.linux.start_daemon')
 
    def test_keyboard_interrupt_handled_gracefully_when_not_daemonized(self, start_daemon):
 
        tmp = TempIO()
 
        stderr_path = tmp.putfile('stderr.txt', '')
 
        start_daemon.side_effect = KeyboardInterrupt
 
        commands.main('dbsync', '--no-init', '--stderr', stderr_path, '--do-not-daemonize', 'start')
 
        with open(stderr_path) as f:
 
            self.assertEqual(f.read(), "Interrupted.\n")
 

	
 
    @patch('rattail.db.sync.linux.stop_daemon')
 
    def test_stop_daemon_with_default_args(self, stop_daemon):
 
        commands.main('dbsync', '--no-init', 'stop')
 
        stop_daemon.assert_called_once_with(None, None)
 

	
 
    @patch('rattail.db.sync.linux.stop_daemon')
 
    def test_stop_daemon_with_explicit_args(self, stop_daemon):
 
        tmp = TempIO()
 
        pid_path = tmp.putfile('test.pid', '')
 
        commands.main('dbsync', '--no-init', '--pidfile', pid_path, 'stop')
 
        stop_daemon.assert_called_once_with(None, pid_path)
 

	
 

	
 
class TestDump(DataTestCase):
 

	
 
    def setUp(self):
 
        super(TestDump, self).setUp()
 
        self.session.add(model.Product(upc='074305001321'))
 
        self.session.add(model.Product(upc='074305001161'))
 
        self.session.commit()
 

	
 
    def test_unknown_model_cannot_be_dumped(self):
 
        tmp = TempIO()
 
        stderr_path = tmp.putfile('stderr.txt', '')
 
        self.assertRaises(SystemExit, commands.main, '--no-init', '--stderr', stderr_path, 'dump', 'NoSuchModel')
 
        with open(stderr_path) as f:
 
            self.assertEqual(f.read(), "Unknown model: NoSuchModel\n")
 

	
 
    def test_dump_goes_to_stdout_by_default(self):
 
        tmp = TempIO()
 
        stdout_path = tmp.putfile('stdout.txt', '')
 
        commands.main('--no-init', '--stdout', stdout_path, 'dump', 'Product')
 
        with open(stdout_path, 'rb') as csv_file:
 
            reader = csv.DictReader(csv_file)
 
            upcs = [row['upc'] for row in reader]
 
        self.assertEqual(len(upcs), 2)
 
        self.assertTrue('00074305001321' in upcs)
 
        self.assertTrue('00074305001161' in upcs)
 

	
 
    def test_dump_goes_to_file_if_so_invoked(self):
 
        tmp = TempIO()
 
        output_path = tmp.putfile('output.txt', '')
 
        commands.main('--no-init', 'dump', 'Product', '--output', output_path)
 
        with open(output_path, 'rb') as csv_file:
 
            reader = csv.DictReader(csv_file)
 
            upcs = [row['upc'] for row in reader]
 
        self.assertEqual(len(upcs), 2)
 
        self.assertTrue('00074305001321' in upcs)
 
        self.assertTrue('00074305001161' in upcs)
 

	
 

	
 
class TestFileMonitor(TestCase):
 

	
 
    @patch('rattail.filemon.linux.start_daemon')
 
    def test_start_daemon_with_default_args(self, start_daemon):
 
        commands.main('filemon', '--no-init', 'start')
 
        start_daemon.assert_called_once_with(None, None, True)
 

	
 
    @patch('rattail.filemon.linux.start_daemon')
 
    def test_start_daemon_with_explicit_args(self, start_daemon):
 
        tmp = TempIO()
 
        pid_path = tmp.putfile('test.pid', '')
 
        commands.main('filemon', '--no-init', '--pidfile', pid_path, '--do-not-daemonize', 'start')
 
        start_daemon.assert_called_once_with(None, pid_path, False)
 

	
 
    @patch('rattail.filemon.linux.stop_daemon')
 
    def test_stop_daemon_with_default_args(self, stop_daemon):
 
        commands.main('filemon', '--no-init', 'stop')
 
        stop_daemon.assert_called_once_with(None, None)
 

	
 
    @patch('rattail.filemon.linux.stop_daemon')
 
    def test_stop_daemon_with_explicit_args(self, stop_daemon):
 
        tmp = TempIO()
 
        pid_path = tmp.putfile('test.pid', '')
 
        commands.main('filemon', '--no-init', '--pidfile', pid_path, 'stop')
 
        stop_daemon.assert_called_once_with(None, pid_path)
 

	
 
    @patch('rattail.commands.sys')
 
    def test_unknown_platform_not_supported(self, sys):
 
        tmp = TempIO()
 
        stderr_path = tmp.putfile('stderr.txt', '')
 
        sys.platform = 'bogus'
 
        commands.main('--no-init', '--stderr', stderr_path, 'filemon', 'start')
 
        sys.exit.assert_called_once_with(1)
 
        with open(stderr_path) as f:
 
            self.assertEqual(f.read(), "File monitor is not supported on platform: bogus\n")
 

	
 

	
 
# # TODO: The purge-batches command tests don't work yet; the db.batches.util
 
# # tests need to be figured out first...
 
# class TestPurgeBatches(DataTestCase):
 

	
 
#     def setUp(self):
 
#         super(TestPurgeBatches, self).setUp()
 
#         self.session.add(model.Batch(purge=datetime.date(2014, 1, 1)))
 
#         self.session.add(model.Batch(purge=datetime.date(2014, 2, 1)))
 
#         self.session.add(model.Batch(purge=datetime.date(2014, 3, 1)))
 
#         self.session.commit()
 
#         self.tmp = TempIO()
 
#         self.stdout_path = self.tmp.putfile('stdout.txt', '')
 

	
 
#     def test_purging_honors_batch_purge_dates(self):
 
#         self.assertEqual(self.session.query(model.Batch).count(), 3)
 
#         commands.main('--no-init', '--stdout', self.stdout_path, 'purge-batches', '--date', '2014-01-15')
 
#         self.assertEqual(self.session.query(model.Batch).count(), 2)
 
#         self.assertEqual(self.session.query(func.min(model.Batch.purge)).scalar(), datetime.date(2014, 2, 1))
 
#         with open(self.stdout_path) as f:
 
#             self.assertTrue(f.read().endswith("\nPurged 1 normal and 0 orphaned batches.\n"))
 

	
 
#     def test_specifying_all_purges_everything(self):
 
#         self.assertEqual(self.session.query(model.Batch).count(), 3)
 
#         commands.main('--no-init', '--stdout', self.stdout_path, 'purge-batches', '--all')
 
#         self.assertEqual(self.session.query(model.Batch).count(), 0)
 
#         with open(self.stdout_path) as f:
 
#             self.assertTrue(f.read().endswith("\nPurged 3 normal and 0 orphaned batches.\n"))
 

	
 
#     def test_orphaned_tables_are_also_purged(self):
 
#         self.session.delete(self.session.query(model.Batch).first())
 
#         self.session.commit()
 
#         self.assertEqual(self.session.query(model.Batch).count(), 2)
 
#         commands.main('--no-init', '--stdout', self.stdout_path, 'purge-batches', '--date', '2013-12-31')
 
#         self.assertEqual(self.session.query(model.Batch).count(), 2)
 
#         with open(self.stdout_path) as f:
 
#             self.assertTrue(f.read().endswith("\nPurged 0 normal and 1 orphaned batches.\n"))
0 comments (0 inline, 0 general)