Changeset - 8ac15ea47029
[Not reviewed]
0 5 3
Lance Edgar (lance) - 7 years ago 2018-01-06 19:19:20
lance@edbob.org
Add first attempt for "importer as batch" feature
8 files changed with 580 insertions and 20 deletions:
0 comments (0 inline, 0 general)
rattail/batch/importer.py
Show inline comments
 
new file 100644
 
# -*- coding: utf-8; -*-
 
################################################################################
 
#
 
#  Rattail -- Retail Software Framework
 
#  Copyright © 2010-2018 Lance Edgar
 
#
 
#  This file is part of Rattail.
 
#
 
#  Rattail is free software: you can redistribute it and/or modify it under the
 
#  terms of the GNU General Public License as published by the Free Software
 
#  Foundation, either version 3 of the License, or (at your option) any later
 
#  version.
 
#
 
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
 
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
#  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
#  details.
 
#
 
#  You should have received a copy of the GNU General Public License along with
 
#  Rattail.  If not, see <http://www.gnu.org/licenses/>.
 
#
 
################################################################################
 
"""
 
Handler for importer batches
 
"""
 

	
 
from __future__ import unicode_literals, absolute_import
 

	
 
import logging
 

	
 
import six
 
import sqlalchemy as sa
 
from sqlalchemy import orm
 

	
 
from rattail.db import model
 
from rattail.batch import BatchHandler
 
from rattail.util import load_object
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ImporterBatchHandler(BatchHandler):
 
    """
 
    Handler for importer batches.
 
    """
 
    batch_model_class = model.ImporterBatch
 

	
 
    def execute(self, batch, user=None, progress=None, **kwargs):
 
        session = orm.object_session(batch)
 
        metadata = sa.MetaData(schema='batch', bind=session.bind)
 
        row_table = sa.Table(batch.row_table, metadata, autoload=True)
 

	
 
        handler = load_object(batch.import_handler_spec)(config=self.config)
 
        handler.runas_user = user
 
        handler.setup()
 
        handler.begin_transaction()
 

	
 
        importer = handler.get_importer(batch.importer_key)
 
        assert importer
 
        importer.setup()
 

	
 
        def process(row, i):
 

	
 
            if row.status_code == self.enum.IMPORTER_BATCH_ROW_STATUS_CREATE:
 
                host_data = {}
 
                for col in row_table.c:
 
                    if col.name.startswith('key_'):
 
                        field = col.name[4:]
 
                        host_data[field] = getattr(row, col.name)
 
                    elif col.name.startswith('post_'):
 
                        field = col.name[5:]
 
                        host_data[field] = getattr(row, col.name)
 
                key = importer.get_key(host_data)
 
                local_object = importer.create_object(key, host_data)
 
                log.debug("created new %s %s: %s", importer.model_name, key, local_object)
 

	
 
            elif row.status_code == self.enum.IMPORTER_BATCH_ROW_STATUS_UPDATE:
 
                host_data = {}
 
                local_data = {}
 
                for col in row_table.c:
 
                    if col.name.startswith('key_'):
 
                        field = col.name[4:]
 
                        host_data[field] = getattr(row, col.name)
 
                        local_data[field] = getattr(row, col.name)
 
                    elif col.name.startswith('pre_'):
 
                        field = col.name[4:]
 
                        local_data[field] = getattr(row, col.name)
 
                    elif col.name.startswith('post_'):
 
                        field = col.name[5:]
 
                        host_data[field] = getattr(row, col.name)
 
                key = importer.get_key(host_data)
 
                local_object = importer.get_local_object(key)
 
                local_object = importer.update_object(local_object, host_data, local_data)
 
                log.debug("updated %s %s: %s", importer.model_name, key, local_object)
 

	
 
            elif row.status_code == self.enum.IMPORTER_BATCH_ROW_STATUS_DELETE:
 
                host_data = {}
 
                for col in row_table.c:
 
                    if col.name.startswith('key_'):
 
                        field = col.name[4:]
 
                        host_data[field] = getattr(row, col.name)
 
                key = importer.get_key(host_data)
 
                local_object = importer.get_local_object(key)
 
                text = six.text_type(local_object)
 
                importer.delete_object(local_object)
 
                log.debug("deleted %s %s: %s", importer.model_name, key, text)
 

	
 
        rows = session.query(row_table)
 
        self.progress_loop(process, rows, progress,
 
                           message="Executing import / export batch")
 

	
 
        importer.teardown()
 
        handler.teardown()
 
        handler.commit_transaction()
 
        return True
rattail/commands/importing.py
Show inline comments
 
@@ -103,12 +103,17 @@ class ImportSubcommand(Subcommand):
 
        except NotImplementedError:
 
            pass
 
        else:
 
            doc += "  Supported models are: ({})".format(', '.join(handler.get_importer_keys()))
 
        parser.add_argument('models', nargs='*', metavar='MODEL', help=doc)
 

	
 
        # make batches
 
        parser.add_argument('--make-batches', action='store_true',
 
                            help="If specified, make new Import / Export Batches instead of "
 
                            "performing an actual (possibly dry-run) import.")
 

	
 
        # fields / exclude
 
        parser.add_argument('--fields',
 
                            help="List of fields which should be included in the import.  "
 
                            "If this parameter is specified, then any field not listed here, "
 
                            "would be *excluded* regardless of the --exclude-field parameter.")
 
        parser.add_argument('--exclude-fields',
 
@@ -157,12 +162,13 @@ class ImportSubcommand(Subcommand):
 
        # max total changes, per model
 
        parser.add_argument('--max-total', type=int, metavar='COUNT',
 
                            help="Maximum number of *any* record changes which may occur, after which "
 
                            "a given import task should stop.  Note that this applies on a per-model "
 
                            "basis and not overall.")
 

	
 
        # TODO: deprecate --batch, replace with --batch-size ?
 
        # batch size
 
        parser.add_argument('--batch', type=int, dest='batch_size', metavar='SIZE', default=200,
 
                            help="Split work to be done into batches, with the specified number of "
 
                            "records in each batch.  Or, set this to 0 (zero) to disable batching. "
 
                            "Implementation for this may vary somewhat between importers; default "
 
                            "batch size is 200 records.")
 
@@ -174,26 +180,28 @@ class ImportSubcommand(Subcommand):
 
                            "behavior of this flag is ultimately up to the import handler, but the "
 
                            "default is to send an email notification.")
 

	
 
        # dry run?
 
        parser.add_argument('--dry-run', action='store_true',
 
                            help="Go through the full motions and allow logging etc. to "
 
                            "occur, but rollback (abort) the transaction at the end.")
 
                            "occur, but rollback (abort) the transaction at the end.  "
 
                            "Note that this flag is ignored if --make-batches is specified.")
 

	
 
    def run(self, args):
 
        log.info("begin `{} {}` for data models: {}".format(
 
                self.parent_name, self.name, ', '.join(args.models or ["(ALL)"])))
 
        log.info("begin `%s %s` for data models: %s",
 
                 self.parent_name,
 
                 self.name,
 
                 ', '.join(args.models) if args.models else "(ALL)")
 

	
 
        handler = self.get_handler(args=args)
 
        models = args.models or handler.get_default_keys()
 
        log.debug("using handler: {}".format(handler))
 
        log.debug("importing models: {}".format(models))
 
        log.debug("args are: {}".format(args))
 

	
 
        kwargs = {
 
            'dry_run': args.dry_run,
 
            'warnings': args.warnings,
 
            'fields': parse_list(args.fields),
 
            'exclude_fields': parse_list(args.exclude_fields),
 
            'create': args.create,
 
            'max_create': args.max_create,
 
            'update': args.update,
 
@@ -201,13 +209,22 @@ class ImportSubcommand(Subcommand):
 
            'delete': args.delete,
 
            'max_delete': args.max_delete,
 
            'max_total': args.max_total,
 
            'progress': self.progress,
 
            'args': args,
 
        }
 
        handler.import_data(*models, **kwargs)
 
        if args.make_batches:
 
            kwargs.update({
 
                'runas_user': self.get_runas_user(),
 
            })
 
            handler.make_batches(*models, **kwargs)
 
        else:
 
            kwargs.update({
 
                'dry_run': args.dry_run,
 
            })
 
            handler.import_data(*models, **kwargs)
 

	
 
        # TODO: should this logging happen elsewhere / be customizable?
 
        if args.dry_run:
 
            log.info("dry run, so transaction was rolled back")
 
        else:
 
            log.info("transaction was committed")
rattail/db/alembic/versions/b82daacc86b7_add_importer_batch.py
Show inline comments
 
new file 100644
 
# -*- coding: utf-8; -*-
 
"""add importer batch
 

	
 
Revision ID: b82daacc86b7
 
Revises: a809caf23cf0
 
Create Date: 2017-12-29 00:16:43.897114
 

	
 
"""
 

	
 
from __future__ import unicode_literals, absolute_import
 

	
 
# revision identifiers, used by Alembic.
 
revision = 'b82daacc86b7'
 
down_revision = u'0c91cf7d557b'
 
branch_labels = None
 
depends_on = None
 

	
 
from alembic import op
 
import sqlalchemy as sa
 
import rattail.db.types
 

	
 

	
 

	
 
def upgrade():
 

	
 
    # batch_importer
 
    op.create_table('batch_importer',
 
                    sa.Column('uuid', sa.String(length=32), nullable=False),
 
                    sa.Column('id', sa.Integer(), nullable=False),
 
                    sa.Column('description', sa.String(length=255), nullable=True),
 
                    sa.Column('created', sa.DateTime(), nullable=False),
 
                    sa.Column('created_by_uuid', sa.String(length=32), nullable=False),
 
                    sa.Column('cognized', sa.DateTime(), nullable=True),
 
                    sa.Column('cognized_by_uuid', sa.String(length=32), nullable=True),
 
                    sa.Column('rowcount', sa.Integer(), nullable=True),
 
                    sa.Column('complete', sa.Boolean(), nullable=True),
 
                    sa.Column('executed', sa.DateTime(), nullable=True),
 
                    sa.Column('executed_by_uuid', sa.String(length=32), nullable=True),
 
                    sa.Column('purge', sa.Date(), nullable=True),
 
                    sa.Column('notes', sa.Text(), nullable=True),
 
                    sa.Column('status_code', sa.Integer(), nullable=True),
 
                    sa.Column('status_text', sa.String(length=255), nullable=True),
 
                    sa.Column('row_table', sa.String(length=255), nullable=False),
 
                    sa.Column('batch_handler_spec', sa.String(length=255), nullable=True),
 
                    sa.Column('import_handler_spec', sa.String(length=255), nullable=False),
 
                    sa.Column('host_title', sa.String(length=255), nullable=False),
 
                    sa.Column('local_title', sa.String(length=255), nullable=False),
 
                    sa.Column('importer_key', sa.String(length=100), nullable=False),
 
                    sa.ForeignKeyConstraint(['cognized_by_uuid'], [u'user.uuid'], name=u'batch_importer_fk_cognized_by'),
 
                    sa.ForeignKeyConstraint(['created_by_uuid'], [u'user.uuid'], name=u'batch_importer_fk_created_by'),
 
                    sa.ForeignKeyConstraint(['executed_by_uuid'], [u'user.uuid'], name=u'batch_importer_fk_executed_by'),
 
                    sa.PrimaryKeyConstraint('uuid')
 
    )
 

	
 

	
 
def downgrade():
 

	
 
    # batch_importer
 
    op.drop_table('batch_importer')
rattail/db/model/__init__.py
Show inline comments
 
@@ -56,12 +56,13 @@ from .bouncer import EmailBounce
 
from .tempmon import TempmonClient, TempmonProbe, TempmonReading
 
from .upgrades import Upgrade, UpgradeRequirement
 

	
 
from .exports import ExportMixin
 
from .reports import ReportOutput
 
from .batch import BatchMixin, BaseFileBatchMixin, FileBatchMixin, BatchRowMixin, ProductBatchRowMixin
 
from .batch.dynamic import DynamicBatchMixin, ImporterBatch
 
from .batch.handheld import HandheldBatch, HandheldBatchRow
 
from .batch.inventory import InventoryBatch, InventoryBatchRow
 
from .batch.labels import LabelBatch, LabelBatchRow
 
from .batch.pricing import PricingBatch, PricingBatchRow
 
from .batch.purchase import PurchaseBatch, PurchaseBatchRow, PurchaseBatchCredit
 
from .batch.vendorcatalog import VendorCatalog, VendorCatalogRow
rattail/db/model/batch/dynamic.py
Show inline comments
 
new file 100644
 
# -*- coding: utf-8; -*-
 
################################################################################
 
#
 
#  Rattail -- Retail Software Framework
 
#  Copyright © 2010-2018 Lance Edgar
 
#
 
#  This file is part of Rattail.
 
#
 
#  Rattail is free software: you can redistribute it and/or modify it under the
 
#  terms of the GNU General Public License as published by the Free Software
 
#  Foundation, either version 3 of the License, or (at your option) any later
 
#  version.
 
#
 
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
 
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
#  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
 
#  details.
 
#
 
#  You should have received a copy of the GNU General Public License along with
 
#  Rattail.  If not, see <http://www.gnu.org/licenses/>.
 
#
 
################################################################################
 
"""
 
Dynamic Batches
 
"""
 

	
 
from __future__ import unicode_literals, absolute_import
 

	
 
import sqlalchemy as sa
 

	
 
from rattail.db.model import Base, BatchMixin
 

	
 

	
 
class DynamicBatchMixin(BatchMixin):
 
    """
 
    Mixin for all dynamic batch (header) classes.
 
    """
 

	
 
    row_table = sa.Column(sa.String(length=255), nullable=False, doc="""
 
    Name of the row data table for the batch.  This will typically be a UUID
 
    and the table will exist within the 'batch' schema in the PostgreSQL DB.
 
    """)
 

	
 
    # TODO: should nullable be False?
 
    batch_handler_spec = sa.Column(sa.String(length=255), nullable=True, doc="""
 
    Object spec for the batch handler.
 
    """)
 

	
 

	
 
class ImporterBatch(DynamicBatchMixin, Base):
 
    """
 
    Dynamic batch for use with arbitrary data importers.
 
    """
 
    __tablename__ = 'batch_importer'
 
    batch_key = 'importer'
 

	
 
    import_handler_spec = sa.Column(sa.String(length=255), nullable=False, doc="""
 
    Object spec for the import handler.
 
    """)
 

	
 
    host_title = sa.Column(sa.String(length=255), nullable=False, doc="""
 
    Host title for the import handler.
 
    """)
 

	
 
    local_title = sa.Column(sa.String(length=255), nullable=False, doc="""
 
    Local title for the import handler.
 
    """)
 

	
 
    importer_key = sa.Column(sa.String(length=100), nullable=False, doc="""
 
    Importer "key" - must be valid within context of the import handler.
 
    """)
rattail/enum.py
Show inline comments
 
@@ -91,12 +91,25 @@ HANDHELD_DEVICE_TYPE_PALMOS             = 'palmos'
 
HANDHELD_DEVICE_TYPE = {
 
    HANDHELD_DEVICE_TYPE_MOTOROLA       : "Motorola",
 
    HANDHELD_DEVICE_TYPE_PALMOS         : "PalmOS",
 
}
 

	
 

	
 
IMPORTER_BATCH_ROW_STATUS_NOCHANGE      = 0
 
IMPORTER_BATCH_ROW_STATUS_CREATE        = 1
 
IMPORTER_BATCH_ROW_STATUS_UPDATE        = 2
 
IMPORTER_BATCH_ROW_STATUS_DELETE        = 3
 

	
 
IMPORTER_BATCH_ROW_STATUS = {
 
    IMPORTER_BATCH_ROW_STATUS_NOCHANGE  : "no change",
 
    IMPORTER_BATCH_ROW_STATUS_CREATE    : "create",
 
    IMPORTER_BATCH_ROW_STATUS_UPDATE    : "update",
 
    IMPORTER_BATCH_ROW_STATUS_DELETE    : "delete",
 
}
 

	
 

	
 
INVENTORY_MODE_REPLACE          = 1
 
INVENTORY_MODE_REPLACE_ADJUST   = 2
 
INVENTORY_MODE_ADJUST           = 3
 
INVENTORY_MODE_ZERO_ALL         = 4
 

	
 
INVENTORY_MODE = {
rattail/importing/handlers.py
Show inline comments
 
# -*- coding: utf-8; -*-
 
################################################################################
 
#
 
#  Rattail -- Retail Software Framework
 
#  Copyright © 2010-2017 Lance Edgar
 
#  Copyright © 2010-2018 Lance Edgar
 
#
 
#  This file is part of Rattail.
 
#
 
#  Rattail is free software: you can redistribute it and/or modify it under the
 
#  terms of the GNU General Public License as published by the Free Software
 
#  Foundation, either version 3 of the License, or (at your option) any later
 
@@ -28,15 +28,17 @@ from __future__ import unicode_literals, absolute_import
 

	
 
import sys
 
import logging
 

	
 
import six
 
import humanize
 
import sqlalchemy as sa
 

	
 
from rattail.core import get_uuid
 
from rattail.time import make_utc
 
from rattail.util import OrderedDict
 
from rattail.util import OrderedDict, get_object_spec, progress_loop
 
from rattail.mail import send_email
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
@@ -99,12 +101,218 @@ class ImportHandler(object):
 
        """
 
        Return a dict of kwargs to be used when construcing an importer with
 
        the given key.
 
        """
 
        return kwargs
 

	
 
    def make_batches(self, *keys, **kwargs):
 
        """
 
        Make new import/export batch for each specified model key.
 
        """
 
        from rattail.db import Session
 

	
 
        session = Session()
 
        model = self.config.get_model()
 
        metadata = sa.MetaData(schema='batch', bind=session.bind)
 
        user = session.merge(kwargs['runas_user'])
 
        handler_spec = self.config.get('rattail.batch', 'importer.handler',
 
                                       default='rattail.batch.importer:ImporterBatchHandler')
 

	
 
        self.progress = kwargs.pop('progress', getattr(self, 'progress', None))
 

	
 
        self.setup()
 
        self.begin_transaction()
 

	
 
        for key in keys:
 

	
 
            importer = self.get_importer(key, **kwargs)
 
            if importer and importer.batches_supported:
 
                log.info("making batch for model: %s", key)
 
                importer._handler_key = key
 

	
 
                batch = model.ImporterBatch()
 
                batch.uuid = get_uuid()
 
                batch.created_by = user
 
                batch.batch_handler_spec = handler_spec
 
                batch.import_handler_spec = get_object_spec(self)
 
                batch.host_title = self.host_title
 
                batch.local_title = self.local_title
 
                batch.importer_key = key
 
                batch.rowcount = 0
 

	
 
                batch.description = "{} -> {} for {}".format(
 
                    batch.host_title,
 
                    batch.local_title,
 
                    batch.importer_key)
 

	
 
                batch.row_table = batch.uuid
 

	
 
                session.add(batch)
 
                session.flush()
 

	
 
                row_table = self.make_row_table(metadata, importer, batch)
 
                self.populate_row_table(session, importer, batch, row_table)
 

	
 
            elif importer:
 
                log.info("batches not supported for importer: %s", key)
 

	
 
            else:
 
                log.warning("skipping unknown model: %s", key)
 

	
 
        self.teardown()
 
        # TODO: necessary?
 
        # self.rollback_transaction()
 
        # self.commit_transaction()
 

	
 
        session.commit()
 
        session.close()
 

	
 
    def make_row_table(self, metadata, importer, batch):
 
        columns = [
 
            sa.Column('uuid', sa.String(length=32), nullable=False, primary_key=True),
 
            sa.Column('sequence', sa.Integer(), nullable=False),
 
            sa.Column('object_key', sa.String(length=255), nullable=False, default=''),
 
            sa.Column('object_str', sa.String(length=255), nullable=False, default=''),
 
        ]
 

	
 
        for field in importer.fields:
 
            typ = importer.field_coltypes.get(field)
 

	
 
            if not typ and importer.model_class:
 
                mapper = sa.inspect(importer.model_class)
 
                if mapper.has_property(field):
 
                    prop = mapper.get_property(field)
 
                    if prop:
 
                        assert len(prop.columns) == 1, "multiple columns ({}) unsupported: {}.{}".format(
 
                            len(prop.columns), batch.importer_key, field)
 
                        typ = prop.columns[0].type
 

	
 
            if not typ:
 
                typ = sa.String(length=255)
 

	
 
            if field in importer.key:
 
                columns.append(sa.Column('key_{}'.format(field), typ))
 
            else:
 
                for prefix in ('pre', 'post'):
 
                    columns.append(sa.Column('{}_{}'.format(prefix, field), typ))
 

	
 
        columns.extend([
 
            sa.Column('status_code', sa.Integer(), nullable=False),
 
            sa.Column('status_text', sa.String(length=255), nullable=True),
 
        ])
 

	
 
        row_table = sa.Table(batch.row_table, metadata, *columns)
 
        row_table.create()
 
        return row_table
 

	
 
    def populate_row_table(self, session, importer, batch, row_table):
 
        importer.now = make_utc(tzinfo=True)
 
        importer.setup()
 

	
 
        # obtain host data
 
        host_data = importer.normalize_host_data()
 
        host_data, unique = importer.unique_data(host_data)
 
        if not host_data:
 
            return
 

	
 
        # cache local data if appropriate
 
        if importer.caches_local_data:
 
            importer.cached_local_data = importer.cache_local_data(host_data)
 

	
 
        # create and/or update
 
        if importer.create or importer.update:
 
            self._populate_create_update(session, importer, batch, row_table, host_data)
 

	
 
        # delete
 
        if importer.delete:
 
            self._populate_delete(session, importer, batch, row_table, host_data, set(unique))
 

	
 
    def _populate_delete(self, session, importer, batch, row_table, host_data, host_keys):
 
        deleting = importer.get_deletion_keys() - host_keys
 

	
 
        def delete(key, i):
 
            cached = importer.cached_local_data.pop(key)
 
            local_data = cached['data']
 
            local_data['_object_str'] = six.text_type(cached['object'])
 
            sequence = batch.rowcount + 1
 
            self.make_batch_row(session, importer, row_table, sequence, None, local_data,
 
                                status_code=self.enum.IMPORTER_BATCH_ROW_STATUS_DELETE)
 
            batch.rowcount += 1
 

	
 
        progress_loop(delete, sorted(deleting), self.progress,
 
                      message="Deleting {} data".format(importer.model_name))
 

	
 
    def _populate_create_update(self, session, importer, batch, row_table, data):
 

	
 
        def record(host_data, i):
 

	
 
            # fetch local object, using key from host data
 
            key = importer.get_key(host_data)
 
            local_object = importer.get_local_object(key)
 
            status_code = self.enum.IMPORTER_BATCH_ROW_STATUS_NOCHANGE
 
            status_text = None
 
            make_row = False
 

	
 
            # if we have a local object, but its data differs from host, make an update record
 
            if local_object and importer.update:
 
                make_row = True
 
                local_data = importer.normalize_local_object(local_object)
 
                diffs = importer.data_diffs(local_data, host_data)
 
                if diffs:
 
                    status_code = self.enum.IMPORTER_BATCH_ROW_STATUS_UPDATE
 
                    status_text = ','.join(diffs)
 

	
 
            # if we did not yet have a local object, make a create record
 
            elif not local_object and importer.create:
 
                make_row = True
 
                local_data = None
 
                status_code = self.enum.IMPORTER_BATCH_ROW_STATUS_CREATE
 

	
 
            if make_row:
 
                sequence = batch.rowcount + 1
 
                self.make_batch_row(session, importer, row_table, sequence, host_data, local_data,
 
                                    status_code=status_code, status_text=status_text)
 
                batch.rowcount += 1
 

	
 
        progress_loop(record, data, self.progress,
 
                      message="Populating batch for {}".format(importer._handler_key))
 

	
 
    def make_batch_row(self, session, importer, row_table, sequence, host_data, local_data, status_code=None, status_text=None):
 
        values = {
 
            'uuid': get_uuid(),
 
            'sequence': sequence,
 
            'object_str': '',
 
            'status_code': status_code,
 
            'status_text': status_text,
 
        }
 

	
 
        if host_data:
 
            if '_object_str' in host_data:
 
                values['object_str'] = host_data['_object_str']
 
            elif '_host_object' in host_data:
 
                values['object_str'] = six.text_type(host_data['_host_object'])
 
            values['object_key'] = ','.join([host_data[f] for f in importer.key])
 
        elif local_data:
 
            if '_object_str' in local_data:
 
                values['object_str'] = local_data['_object_str']
 
            elif '_object' in local_data:
 
                values['object_str'] = six.text_type(local_data['_object'])
 
            values['object_key'] = ','.join([local_data[f] for f in importer.key])
 

	
 
        for field in importer.fields:
 
            if field in importer.key:
 
                data = host_data or local_data
 
                values['key_{}'.format(field)] = data[field]
 
            else:
 
                if host_data and field in host_data:
 
                    values['post_{}'.format(field)] = host_data[field]
 
                if local_data and field in local_data:
 
                    values['pre_{}'.format(field)] = local_data[field]
 

	
 
        session.execute(row_table.insert(values))
 

	
 
    def import_data(self, *keys, **kwargs):
 
        """
 
        Import all data for the given importer/model keys.
 
        """
 
        self.import_began = make_utc(tzinfo=True)
 
        if 'dry_run' in kwargs:
rattail/importing/importers.py
Show inline comments
 
@@ -79,12 +79,24 @@ class Importer(object):
 
    caches_local_data = False
 
    cached_local_data = None
 

	
 
    host_system_title = None
 
    local_system_title = None
 

	
 
    # TODO
 
    # Whether or not the registered "importer" batch handler is able to handle
 
    # batches for this importer (and/or, whether this importer is able to
 
    # provide what's needed for the same).
 
    batches_supported = False
 

	
 
    # TODO
 
    # If ``batches_supported`` is true, this should contain SQLAlchemy
 
    # ``Column`` instance overrides, keyed by fieldname.  Any field not
 
    # represented here will be given the default column type (string).
 
    field_coltypes = {}
 

	
 
    def __init__(self, config=None, key=None, fields=None, exclude_fields=None, **kwargs):
 
        self.config = config
 
        self.enum = config.get_enum() if config else None
 
        self.model_class = kwargs.pop('model_class', self.get_model_class())
 
        if key is not None:
 
            self.key = key
 
@@ -136,12 +148,26 @@ class Importer(object):
 
        """
 

	
 
    def progress_loop(self, func, items, factory=None, **kwargs):
 
        factory = factory or self.progress
 
        return progress_loop(func, items, factory, **kwargs)
 

	
 
    def unique_data(self, host_data):
 
        # Prune duplicate keys from host/source data.  This is for the sake of
 
        # sanity since duplicates typically lead to a ping-pong effect, where a
 
        # "clean" (change-less) import is impossible.
 
        unique = OrderedDict()
 
        for data in host_data:
 
            key = self.get_key(data)
 
            if key in unique:
 
                log.warning("duplicate records detected from {} for key: {}".format(
 
                    self.host_system_title, key))
 
            else:
 
                unique[key] = data
 
        return list(unique.values()), unique
 

	
 
    def import_data(self, host_data=None, now=None, **kwargs):
 
        """
 
        Import some data!  This is the core body of logic for that, regardless
 
        of where data is coming from or where it's headed.  Note that this
 
        method handles deletions as well as adds/updates.
 
        """
 
@@ -153,25 +179,13 @@ class Importer(object):
 
        updated = []
 
        deleted = []
 

	
 
        # Get complete set of normalized host data.
 
        if host_data is None:
 
            host_data = self.normalize_host_data()
 

	
 
        # Prune duplicate keys from host/source data.  This is for the sake of
 
        # sanity since duplicates typically lead to a ping-pong effect, where a
 
        # "clean" (change-less) import is impossible.
 
        unique = OrderedDict()
 
        for data in host_data:
 
            key = self.get_key(data)
 
            if key in unique:
 
                log.warning("duplicate records detected from {} for key: {}".format(
 
                    self.host_system_title, key))
 
            else:
 
                unique[key] = data
 
        host_data = list(unique.itervalues())
 
        host_data, unique = self.unique_data(host_data)
 

	
 
        # Cache local data if appropriate.
 
        if self.caches_local_data:
 
            self.cached_local_data = self.cache_local_data(host_data)
 

	
 
        # Create and/or update data.
 
@@ -248,12 +262,73 @@ class Importer(object):
 
            self.progress_loop(import_, data, message="Importing {} data".format(self.model_name))
 
        except ImportLimitReached:
 
            pass
 
        self.flush_create_update_final()
 
        return created, updated
 

	
 
    # def _populate_create_update(self, row_table, data):
 
    #     """
 
    #     Populate create and/or update records for the given batch row table,
 
    #     according to the given host data set.
 
    #     """
 
    #     created = []
 
    #     updated = []
 
    #     # count = len(data)
 
    #     # if not count:
 
    #     #     return created, updated
 

	
 
    #     def record(host_data, i):
 
            
 
    #         # fetch local object, using key from host data
 
    #         key = self.get_key(host_data)
 
    #         local_object = self.get_local_object(key)
 

	
 
    #         # if we have a local object, but its data differs from host, make an update record
 
    #         if local_object and self.update:
 
    #             local_data = self.normalize_local_object(local_object)
 
    #             diffs = self.data_diffs(local_data, host_data)
 
    #             if diffs:
 
    #                 log.debug("fields '{}' differed for local data: {}, host data: {}".format(
 
    #                     ','.join(diffs), local_data, host_data))
 
    #                 local_object = self.update_object(local_object, host_data, local_data)
 
    #                 updated.append((local_object, local_data, host_data))
 
    #                 if self.max_update and len(updated) >= self.max_update:
 
    #                     log.warning("max of {} *updated* records has been reached; stopping now".format(self.max_update))
 
    #                     raise ImportLimitReached()
 
    #                 if self.max_total and (len(created) + len(updated)) >= self.max_total:
 
    #                     log.warning("max of {} *total changes* has been reached; stopping now".format(self.max_total))
 
    #                     raise ImportLimitReached()
 

	
 
    #         # if we did not yet have a local object, make a create record
 
    #         elif not local_object and self.create:
 
    #             local_object = self.create_object(key, host_data)
 
    #             if local_object:
 
    #                 log.debug("created new {} {}: {}".format(self.model_name, key, local_object))
 
    #                 created.append((local_object, host_data))
 
    #                 if self.caches_local_data and self.cached_local_data is not None:
 
    #                     self.cached_local_data[key] = {'object': local_object, 'data': self.normalize_local_object(local_object)}
 
    #                 if self.max_create and len(created) >= self.max_create:
 
    #                     log.warning("max of {} *created* records has been reached; stopping now".format(self.max_create))
 
    #                     raise ImportLimitReached()
 
    #                 if self.max_total and (len(created) + len(updated)) >= self.max_total:
 
    #                     log.warning("max of {} *total changes* has been reached; stopping now".format(self.max_total))
 
    #                     raise ImportLimitReached()
 
    #             else:
 
    #                 log.debug("did NOT create new {} for key: {}".format(self.model_name, key))
 

	
 
    #         # flush changes every so often
 
    #         if not self.batch_size or (len(created) + len(updated)) % self.batch_size == 0:
 
    #             self.flush_create_update()
 

	
 
    #     try:
 
    #         self.progress_loop(record, data, message="Importing {} data".format(self.model_name))
 
    #     except ImportLimitReached:
 
    #         pass
 
    #     # self.flush_create_update_final()
 
    #     return created, updated
 

	
 
    def flush_create_update(self):
 
        """
 
        Perform any steps necessary to "flush" the create/update changes which
 
        have occurred thus far in the import.
 
        """
 

	
0 comments (0 inline, 0 general)