Changeset - 263dafef0c04
[Not reviewed]
0 5 3
Lance Edgar (lance) - 3 years ago 2022-04-06 16:22:21
lance@edbob.org
Add more error handling to datasync; more docs

still trying to isolate a particular problem for one install, to
determine how it can be worked around...
8 files changed with 345 insertions and 109 deletions:
0 comments (0 inline, 0 general)
docs/api/index.rst
Show inline comments
 
@@ -30,8 +30,6 @@ attributes and method signatures etc.
 
   rattail/csvutil
 
   rattail/custorders
 
   rattail/datasync/index
 
   rattail/datasync/consumers
 
   rattail/datasync/rattail
 
   rattail/db/index
 
   rattail/employment
 
   rattail/enum
docs/api/rattail/datasync/config.rst
Show inline comments
 
new file 100644
 

	
 
``rattail.datasync.config``
 
===========================
 

	
 
.. automodule:: rattail.datasync.config
 
   :members:
docs/api/rattail/datasync/daemon.rst
Show inline comments
 
new file 100644
 

	
 
``rattail.datasync.daemon``
 
===========================
 

	
 
.. automodule:: rattail.datasync.daemon
 
   :members:
docs/api/rattail/datasync/index.rst
Show inline comments
 
@@ -7,5 +7,8 @@
 
.. toctree::
 
   :maxdepth: 1
 

	
 
   config
 
   consumers
 
   daemon
 
   rattail
 
   watchers
docs/api/rattail/datasync/watchers.rst
Show inline comments
 
new file 100644
 

	
 
``rattail.datasync.watchers``
 
=============================
 

	
 
.. automodule:: rattail.datasync.watchers
 
   :members:
rattail/datasync/__init__.py
Show inline comments
 
@@ -2,7 +2,7 @@
 
################################################################################
 
#
 
#  Rattail -- Retail Software Framework
 
#  Copyright © 2010-2021 Lance Edgar
 
#  Copyright © 2010-2022 Lance Edgar
 
#
 
#  This file is part of Rattail.
 
#
 
@@ -21,7 +21,9 @@
 
#
 
################################################################################
 
"""
 
DataSync Daemon
 
This subpackage contains logic for the Rattail "datasync" daemon.
 

	
 
For general info see :doc:`rattail-manual:/data/sync/index`.
 
"""
 

	
 
from __future__ import unicode_literals, absolute_import
rattail/datasync/daemon.py
Show inline comments
 
@@ -28,7 +28,6 @@ from __future__ import unicode_literals, absolute_import
 

	
 
import sys
 
import time
 
import datetime
 
import logging
 
from traceback import format_exception
 

	
 
@@ -37,7 +36,6 @@ from rattail.daemon import Daemon
 
from rattail.threads import Thread
 
from rattail.datasync.config import load_profiles
 
from rattail.datasync.util import get_lastrun, get_lastrun_setting, get_lastrun_timefmt, next_batch_id
 
from rattail.time import make_utc, localtime
 

	
 

	
 
log = logging.getLogger(__name__)
 
@@ -46,11 +44,37 @@ log = logging.getLogger(__name__)
 
class DataSyncDaemon(Daemon):
 
    """
 
    Linux daemon implementation of DataSync.
 

	
 
    This is normally started via command line, e.g.:
 

	
 
    .. code-block:: sh
 

	
 
       cd /srv/envs/poser
 
       bin/rattail -c app/datasync.conf datasync start
 

	
 
    .. note::
 
       Even though the naming implies a "proper" daemon, it will not
 
       actually daemonize itself.  For true daemon behavior, you
 
       should run this using a wrapper such as `supervisor`_.
 

	
 
       .. _supervisor: http://supervisord.org
 
    """
 

	
 
    def run(self):
 
        """
 
        Starts watcher and consumer threads according to configuration.
 

	
 
        A separate thread is started for each watcher defined in
 
        config.  :func:`watch_for_changes()` is the target callable
 
        for each of these threads.
 

	
 
        Additionally, a separate thread is started for each consumer
 
        defined for the watcher.  :func:`consume_changes_perpetual()`
 
        is the target callable for each of these threads.
 

	
 
        Once all threads are started, this (main) thread loops forever
 
        (so as to stay alive, and hence keep child threads alive) but
 
        takes no further actions.
 
        """
 
        for key, profile in load_profiles(self.config).items():
 

	
 
@@ -69,7 +93,7 @@ class DataSyncDaemon(Daemon):
 
                for consumer in profile.consumers:
 
                    name = '{}-consumer-{}'.format(key, consumer.key)
 
                    log.debug("starting thread '%s' with consumer: %s", name, consumer.spec)
 
                    thread = Thread(target=consume_changes_perpetual, name=name, args=(self.config, profile, consumer))
 
                    thread = Thread(target=consume_changes_perpetual, name=name, args=(self.config, consumer))
 
                    thread.daemon = True
 
                    thread.start()
 

	
 
@@ -82,10 +106,27 @@ class DataSyncDaemon(Daemon):
 

	
 
def watch_for_changes(config, watcher):
 
    """
 
    Target for DataSync watcher threads.
 
    Target for datasync watcher threads.
 

	
 
    This function will loop forever (barring an error) and
 
    periodically invoke the ``watcher`` object to check for any new
 
    changes, then pause before doing it again, etc.  It also tries to
 
    detect errors and handle them gracefully.
 

	
 
    The watcher itself is of course responsible for the mechanics of
 
    checking for new changes.  It also determines how frequently the
 
    checks should happen, etc.
 

	
 
    Each time the watcher finds/returns changes, this function will
 
    invoke :func:`record_or_process_changes()` with the result.
 

	
 
    :param config: Config object for the app.
 

	
 
    :param watcher: Reference to datasync watcher object.
 
    """
 
    app = config.get_app()
 
    # Let watcher do any setup it needs.
 

	
 
    # let watcher do any setup it needs
 
    watcher.setup()
 

	
 
    # the 'last run' value is maintained as zone-aware UTC
 
@@ -93,13 +134,17 @@ def watch_for_changes(config, watcher):
 
    lastrun_setting = get_lastrun_setting(config, watcher.key)
 
    timefmt = get_lastrun_timefmt(config)
 

	
 
    # outer loop - this should never terminate unless an unhandled
 
    # exception happens, or the daemon is stopped
 
    while True:
 

	
 
        thisrun = make_utc(tzinfo=True)
 
        # reset for inner loop
 
        thisrun = app.make_utc(tzinfo=True)
 
        attempts = 0
 
        errtype = None
 
        while True:
 
        last_error_type = None
 

	
 
        # inner loop - this is for sake of retry
 
        while True:
 
            attempts += 1
 

	
 
            try:
 
@@ -125,47 +170,80 @@ def watch_for_changes(config, watcher):
 

	
 
                # If this exception is not the first, and is of a different type
 
                # than seen previously, do *not* continue to retry.
 
                if errtype is not None and not isinstance(error, errtype):
 
                if last_error_type is not None and not isinstance(error, last_error_type):
 
                    log.exception("new exception differs from previous one(s), "
 
                                  "giving up on watcher.get_changes()")
 
                    raise
 

	
 
                # Record the type of exception seen, and pause for next retry.
 
                errtype = type(error)
 
                log.warning("attempt #{} failed for '{}' watcher.get_changes()".format(attempts, watcher.key))
 
                log.debug("pausing for {} seconds before making attempt #{} of {}".format(
 
                    watcher.retry_delay, attempts + 1, watcher.retry_attempts))
 
                # remember which type of error this was; pause for next retry
 
                last_error_type = type(error)
 
                log.warning("attempt #%s of %s for watcher.get_changes() failed",
 
                            attempts, watcher.retry_attempts, exc_info=True)
 
                log.debug("pausing for %s seconds before making next attempt",
 
                          watcher.retry_delay)
 
                if watcher.retry_delay:
 
                    time.sleep(watcher.retry_delay)
 

	
 
            else:
 
                # watcher got changes okay, so record/process and prune, then
 
                # break out of inner loop to reset the attempt count for next grab
 
            else: # watcher got changes okay (possibly empty set)
 

	
 
                # record new lastrun time
 
                lastrun = thisrun
 
                api.save_setting(None, lastrun_setting, lastrun.strftime(timefmt))
 
                try:
 
                    api.save_setting(None, lastrun_setting,
 
                                     lastrun.strftime(timefmt))
 
                except:
 
                    log.exception("failed to save lastrun time")
 
                    raise
 

	
 
                if changes:
 
                    log.debug("got {} changes from '{}' watcher".format(len(changes), watcher.key))
 
                    # TODO: and what if this step fails..??
 
                    if record_or_process_changes(config, watcher, changes):
 
                        prune_changes(watcher, changes)
 
                    log.debug("got %s changes from watcher", len(changes))
 

	
 
                    # record or process changes (depends on watcher)
 
                    try:
 
                        record_or_process_changes(config, watcher, changes)
 
                    except:
 
                        log.exception("failed to record/process changes")
 
                        raise
 

	
 
                    # prune changes if applicable (depends on watcher)
 
                    try:
 
                        prune_changes(config, watcher, changes)
 
                    except:
 
                        log.exception("failed to prune changes")
 
                        raise
 

	
 
                # break out of inner loop to reset the attempt count
 
                # for next grab
 
                break
 

	
 
        # sleep a moment between successful change grabs
 
        # pause between successful change grabs
 
        time.sleep(watcher.delay)
 

	
 

	
 
def record_or_process_changes(config, watcher, changes):
 
    """
 
    Now that we have changes from the watcher, we'll either record them as
 
    proper DataSync changes, or else let the watcher process them (if it
 
    consumes self).
 
    This function is responsible for the "initial" handling of changes
 
    obtained from a watcher.  What this means will depend on the
 
    watcher, as follows:
 

	
 
    Most watchers are just that - their only job is to report new
 
    changes to datasync.  In this case this function will merely
 
    "record" the changes, by inserting them into the
 
    ``datasync_change`` queue table for further processing by the
 
    consumer(s).
 

	
 
    But some watchers "consume self" - meaning any changes they find,
 
    they also are responsible for processing.  In this case this
 
    function will "process" (consume) the changes directly, by
 
    invoking the watcher to do so.  These changes will *not* be added
 
    to the queue table for any other consumer(s) to process.
 

	
 
    :param config: Config object for the app.
 

	
 
    :param watcher: Reference to datasync watcher, from whence changes
 
       came.
 

	
 
    :changes: List of changes obtained from the watcher.
 
    :param changes: List of changes obtained from the watcher.
 

	
 
    :returns: ``True`` if all goes well, ``False`` if error.
 

	
 
@@ -177,15 +255,19 @@ def record_or_process_changes(config, watcher, changes):
 
    app = config.get_app()
 
    model = config.get_model()
 

	
 
    # If watcher consumes itself, then it will process its own changes.  Note
 
    # that there are no assumptions made about the format or structure of these
 
    # change objects.
 
    # if watcher consumes itself, then it will process its own
 
    # changes.  note that there are no assumptions made about the
 
    # format or structure of these change objects.
 
    if watcher.consumes_self:
 
        session = app.make_session()
 
        try:
 
            session = app.make_session()
 
        except:
 
            log.exception("failed to make session")
 
            raise
 
        try:
 
            watcher.process_changes(session, changes)
 
        except Exception:
 
            log.exception("error calling watcher.process_changes()")
 
        except:
 
            log.exception("watcher failed to process its changes")
 
            session.rollback()
 
            raise
 
        else:
 
@@ -195,18 +277,25 @@ def record_or_process_changes(config, watcher, changes):
 
        finally:
 
            session.close()
 

	
 
    else:
 
        # Give all change stubs the same timestamp, to help identify them
 
        # as a "batch" of sorts, so consumers can process them as such.
 
        # (note, this is less important for identifiying a batch now that we
 
        # have batch_id, but is probably still helpful anyway)
 
        now = datetime.datetime.utcnow()
 

	
 
        # Save change stub records to rattail database, for consumer thread
 
        # to find and process.
 
        saved = 0
 
    # will record changes to consumer queue...
 

	
 
    # give all change stubs the same timestamp, to help identify them
 
    # as a "batch" of sorts, so consumers can process them as such.
 
    # (note, this is less important for identifiying a batch now that
 
    # we have batch_id, but is probably still helpful anyway)
 
    now = app.make_utc()
 

	
 
    # save change stub records to rattail database, for consumer
 
    # thread to find and process
 
    saved = 0
 
    try:
 
        session = app.make_session()
 
        # assign new/unique batch_id so that consumers can keep things straight
 
    except:
 
        log.exception("failed to make session for recording changes")
 
        raise
 
    try:
 
        # assign new/unique batch_id so that consumers can keep things
 
        # straight
 
        batch_id = next_batch_id(session)
 
        batch_seq = 0
 
        for key, change in changes:
 
@@ -223,32 +312,52 @@ def record_or_process_changes(config, watcher, changes):
 
                    consumer=consumer))
 
                saved += 1
 
            session.flush()
 
    except:
 
        log.exception("failed to record changes")
 
        session.rollback()
 
        raise
 
    else:
 
        session.commit()
 
    finally:
 
        session.close()
 
        log.debug("saved {} '{}' changes to datasync queue".format(saved, watcher.key))
 
        return True
 

	
 
    log.debug("saved %s '%s' changes to datasync queue", saved, watcher.key)
 
    return True
 

	
 

	
 
def prune_changes(watcher, changes):
 
def prune_changes(config, watcher, changes):
 
    """
 
    Tell the watcher to prune the original change records from its source
 
    database, if relevant.
 
    """
 
    if watcher.prunes_changes:
 
        try:
 
            # Note that we only give it the keys for this.
 
            pruned = watcher.prune_changes([c[0] for c in changes])
 
        except:
 
            log.exception("error calling watcher.prune_changes()")
 
            raise
 
        else:
 
            if pruned is not None:
 
                log.debug("pruned {} changes from '{}' database".format(pruned, watcher.key))
 
    if not watcher.prunes_changes:
 
        return
 

	
 
    try:
 
        # note that we only give it the keys for this
 
        pruned = watcher.prune_changes([c[0] for c in changes])
 
    except:
 
        log.exception("failed to prune changes")
 
        raise
 
    if pruned is not None:
 
        log.debug("watcher pruned %s changes", pruned)
 

	
 
def consume_changes_perpetual(config, profile, consumer):
 

	
 
def consume_changes_perpetual(config, consumer):
 
    """
 
    Target for DataSync consumer thread.
 
    Target for datasync consumer threads.
 

	
 
    This function will loop forever (barring an error) and
 
    periodically invoke the ``consumer`` object to process any changes
 
    in the queue, then pause before doing it again, etc.  It also
 
    tries to detect errors and handle them gracefully.
 

	
 
    This function is mostly just the loop itself; it calls
 
    :func:`consume_current_changes()` during each iteration.
 

	
 
    :param config: Config object for the app.
 

	
 
    :param consumer: Reference to datasync consumer object.
 
    """
 
    # tell consumer to do initial setup
 
    consumer.setup()
 
@@ -257,74 +366,163 @@ def consume_changes_perpetual(config, profile, consumer):
 
    while True:
 

	
 
        # try to consume all current changes
 
        if not consume_current_changes(config, profile, consumer):
 
        try:
 
            result = consume_current_changes(config, consumer)
 
        except:
 
            log.exception("failed to consume current changes")
 
            raise
 

	
 
            # consumption failed, so exit the perma-loop
 
            # (this thread is now dead)
 
        if not result:
 
            # consumption failed, so exit the perma-loop (this thread
 
            # is now dead)
 
            break
 

	
 
        # wait 1 sec by default, then look for more changes
 
        time.sleep(consumer.delay)
 

	
 

	
 
def consume_current_changes(config, profile, consumer):
 
def consume_current_changes(config, consumer):
 
    """
 
    Consume all changes currently available for the given consumer.
 

	
 
    :param config: Config object for the app.
 
    The datasync queue table will be checked, and if it contains any
 
    changes applicable to the given consumer, then the consumer will
 
    be invoked to process the changes.
 

	
 
    If there are no applicable changes in the queue, this function
 
    will return without taking any real action.  But if there are
 
    changes, then it tries to be smart about processing them in the
 
    correct order, as follows:
 

	
 
    The changes are sorted by
 
    :attr:`~rattail.db.model.datasync.DataSyncChange.obtained` in
 
    order to determine the earliest timestamp.  Then it calls
 
    :func:`consume_changes_from()` with that timestamp.
 

	
 
    :param profile: Reference to datasync config profile, which in
 
       turn contains the watcher reference.
 
    Once all changes with that timestamp have been processed
 
    (consumed), this function again looks for any applicable changes
 
    in the queue, sorting by timestamp and then calling
 
    :func:`consume_changes_from()` with earliest timestamp.
 

	
 
    :param consumer: Reference to datasync consumer, for which changes
 
       should be processed (consumed).
 
    This process repeats until there are no longer any changes in the
 
    queue which pertain to the given consumer.
 

	
 
    :param config: Config object for the app.
 

	
 
    :param consumer: Reference to datasync consumer object.
 

	
 
    :returns: ``True`` if all goes well, ``False`` if error.
 
    """
 
    app = config.get_app()
 
    model = config.get_model()
 

	
 
    def get_first_change():
 
    try:
 
        session = app.make_session()
 
    except:
 
        log.exception("failed to make session for consuming changes")
 
        raise
 

	
 
    def get_first_change():
 
        change = session.query(model.DataSyncChange)\
 
                        .filter(model.DataSyncChange.source == consumer.watcher.key)\
 
                        .filter(model.DataSyncChange.consumer == consumer.key)\
 
                        .order_by(model.DataSyncChange.obtained)\
 
                        .first()
 
        session.close()
 
        return change
 

	
 
    # determine first 'obtained' timestamp
 
    first = get_first_change()
 
    try:
 
        first = get_first_change()
 
    except:
 
        log.exception("failed to get first change")
 
        session.close()
 
        return False
 

	
 
    error = False
 
    while first:
 

	
 
        # try to consume these changes
 
        if not consume_changes_from(config, consumer, first.obtained):
 
        try:
 
            if not consume_changes_from(config, session, consumer,
 
                                        first.obtained):
 
                error = True
 
        except:
 
            error = True
 
            log.exception("failed to consume changes obtained at: %s",
 
                          first.obtained)
 

	
 
        if error:
 
            break
 

	
 
        else: # fetch next 'obtained' timestamp
 
        # fetch next 'obtained' timestamp
 
        try:
 
            first = get_first_change()
 
        except:
 
            log.exception("failed to get next-first change")
 
            break
 

	
 
    # no more changes! (or perhaps an error)
 
    session.close()
 
    return not error
 

	
 

	
 
def consume_changes_from(config, consumer, obtained):
 
def consume_changes_from(config, session, consumer, obtained):
 
    """
 
    Consume all changes which were "obtained" at the given timestamp.
 

	
 
    This causes the consumer to process the changes, which are then
 
    removed from the queue.  The session which contained the changes
 
    will be committed, so that they are truly removed, and "progress"
 
    is somewhat more transparent.
 
    This fetches all changes from the datasync queue table, which
 
    correspond to the given consumer and which have an
 
    :attr:`~rattail.db.model.datasync.DataSyncChange.obtained` value
 
    matching the one specified.
 

	
 
    There are two possibilities here: either the matching changes are
 
    part of a "true" batch (i.e. they have a
 
    :attr:`~rattail.db.model.datasync.DataSyncChange.batch_id` value),
 
    or not.
 

	
 
    This function therefore first looks for changes which *do* have a
 
    ``batch_id``.  If found, it then sorts those changes by
 
    :attr:`~rattail.db.model.datasync.DataSyncChange.batch_sequence`
 
    to be sure they are processed in the correct order.
 

	
 
    If none of the changes have a ``batch_id`` then this function does
 
    not sort the changes in any way; they will be processed in
 
    (presumably) random order.
 

	
 
    In any case, regardless of ``batch_id``, at this point the
 
    function has identified a set of changes to be processed as a
 
    "batch" by the consumer.  But the larger the batch, the longer it
 
    will take for the consumer to process it.  This brings a couple of
 
    issues:
 

	
 
    If the consumer is a Rattail DB, and data versioning is enabled,
 
    this may cause rather massive resource usage if too many data
 
    writes happen.
 

	
 
    Additionally, there is no true "progress indicator" for datasync
 
    at this time.  A semi-practical way to determine its progress is
 
    simply to view the queue table and see what if anything it
 
    contains (when empty, processing is complete).  The set of changes
 
    being processed here, will be removed from the queue only after
 
    being processed.  Hence, the larger the batch, the "less granular"
 
    the "progress indicator" will be.
 

	
 
    To address these issues then, this function may "prune" the set of
 
    changes such that only so many are processed at a time.
 

	
 
    And finally, we have a (possibly smaller) set of changes to be
 
    processed.  This function will then ask the consumer to begin a
 
    new transaction, then process the changes, and ultimately commit
 
    the transaction.
 

	
 
    Once processing is complete (i.e. assuming no error) then those
 
    changes are removed from the queue.
 

	
 
    :param config: Config object for the app.
 

	
 
    :param consumer: Reference to datasync consumer, for which changes
 
       should be processed (consumed).
 
    :param session: Current session for Rattail DB.
 

	
 
    :param consumer: Reference to datasync consumer object.
 

	
 
    :param obtained: UTC "obtained" timestamp for the first change in
 
       the queue.  This is used to filter existing changes, i.e. we
 
@@ -334,7 +532,6 @@ def consume_changes_from(config, consumer, obtained):
 
    :returns: ``True`` if all goes well, ``False`` if error.
 
    """
 
    app = config.get_app()
 
    session = app.make_session()
 
    model = config.get_model()
 

	
 
    # we only want changes "obtained" at the given time.  however, at least
 
@@ -371,7 +568,7 @@ def consume_changes_from(config, consumer, obtained):
 
        changes = changes[:batch_size]
 

	
 
    log.debug("will process %s changes from %s", len(changes),
 
              localtime(config, obtained, from_utc=True))
 
              app.localtime(obtained, from_utc=True))
 
    consumer.begin_transaction()
 

	
 
    attempts = 0
 
@@ -382,13 +579,15 @@ def consume_changes_from(config, consumer, obtained):
 

	
 
        try:
 
            consumer.process_changes(session, changes)
 
            session.flush()
 

	
 
        except Exception as errobj: # processing failed!
 
            exc_type, exc, traceback = sys.exc_info()
 

	
 
            consumer.rollback_transaction()
 
            session.rollback()
 
            try:
 
                consumer.rollback_transaction()
 
            except:
 
                log.exception("consumer failed to rollback transaction")
 
                return False
 

	
 
            # if we've reached our final attempt, stop retrying
 
            if attempts >= consumer.retry_attempts:
 
@@ -403,7 +602,6 @@ def consume_changes_from(config, consumer, obtained):
 
                    'traceback': ''.join(format_exception(exc_type, exc, traceback)).strip(),
 
                    'datasync_url': config.datasync_url(),
 
                })
 
                session.close()
 
                return False
 

	
 
            # if this exception is not the first, and is of a different type
 
@@ -411,7 +609,6 @@ def consume_changes_from(config, consumer, obtained):
 
            if errtype is not None and not isinstance(errobj, errtype):
 
                log.exception("new exception differs from previous one(s), "
 
                              "giving up on consumer.process_changes()")
 
                session.close()
 
                return False
 

	
 
            # record the type of exception seen; maybe pause before next retry
 
@@ -425,19 +622,26 @@ def consume_changes_from(config, consumer, obtained):
 

	
 
        else: # consumer processed changes okay
 

	
 
            consumer.commit_transaction()
 
            # commit consumer transaction
 
            try:
 
                consumer.commit_transaction()
 
            except:
 
                log.exception("consumer failed to commit transaction")
 
                return False
 

	
 
            # delete these changes from datasync queue
 
            for i, change in enumerate(changes):
 
                session.delete(change)
 
                if i % 200 == 0:
 
                    session.flush()
 
            session.flush()
 
            session.commit()
 
            try:
 
                for i, change in enumerate(changes):
 
                    session.delete(change)
 
                    if i % 200 == 0:
 
                        session.flush()
 
                session.commit()
 
            except:
 
                log.exception("failed to delete changes from queue")
 
                return False
 

	
 
            # can stop the attempt/retry loop now
 
            log.debug("processed %s changes", len(changes))
 
            break
 

	
 
    session.close()
 
    return True
rattail/datasync/rattail.py
Show inline comments
 
@@ -141,16 +141,27 @@ class RattailWatcher(DataSyncWatcher):
 
    def prune_changes(self, keys):
 
        model = self.model
 
        deleted = 0
 
        session = self.app.make_session(bind=self.engine)
 
        for key in keys:
 
            if key: # note that key can sometimes be None
 
                change = session.query(model.Change).get(key)
 
                if change:
 
                    session.delete(change)
 
                    session.flush()
 
                    deleted += 1
 
        session.commit()
 
        session.close()
 
        try:
 
            session = self.app.make_session(bind=self.engine)
 
        except:
 
            log.exception("failed to make session for pruning changes")
 
            raise
 
        try:
 
            for key in keys:
 
                if key: # note that key can sometimes be None
 
                    change = session.query(model.Change).get(key)
 
                    if change:
 
                        session.delete(change)
 
                        session.flush()
 
                        deleted += 1
 
        except:
 
            log.exception("failed to prune changes")
 
            session.rollback()
 
            raise
 
        else:
 
            session.commit()
 
        finally:
 
            session.close()
 
        return deleted
 

	
 

	
0 comments (0 inline, 0 general)