Changeset - eb5071c457b6
[Not reviewed]
0 2 0
Lance Edgar (lance) - 3 years ago 2022-03-26 13:34:09
lance@edbob.org
Try harder to cleanup when datasync error happens

specifically when a connection goes bad, there seems to be some issue
with logging and garbage collection maybe? hoping this change fixes
our cleanup such that gc need not cause issues..

cf. https://github.com/sqlalchemy/sqlalchemy/issues/5522
2 files changed with 29 insertions and 19 deletions:
0 comments (0 inline, 0 general)
rattail/datasync/daemon.py
Show inline comments
 
@@ -2,7 +2,7 @@
 
################################################################################
 
#
 
#  Rattail -- Retail Software Framework
 
#  Copyright © 2010-2019 Lance Edgar
 
#  Copyright © 2010-2022 Lance Edgar
 
#
 
#  This file is part of Rattail.
 
#
 
@@ -316,12 +316,10 @@ def consume_changes_from(config, consumer, obtained):
 
    """
 
    Consume all changes which were "obtained" at the given timestamp.
 

	
 
    Note that while this function must be given a Rattail database session, the
 
    function logic is responsible for committing (or rolling back) the session
 
    transaction.  The assumption here is that this session will be used *only*
 
    for managing the datasync change queue, and that we should be "saving our
 
    progress" as changes are fully consumed, i.e. those changes are deleted
 
    from the queue via session commit.
 
    This causes the consumer to process the changes, which are then
 
    removed from the queue.  The session which contained the changes
 
    will be committed, so that they are truly removed, and "progress"
 
    is somewhat more transparent.
 

	
 
    :param config: Config object for the app.
 

	
rattail/datasync/rattail.py
Show inline comments
 
@@ -28,6 +28,7 @@ from __future__ import unicode_literals, absolute_import
 

	
 
import logging
 

	
 
import six
 
from sqlalchemy import orm
 

	
 
from rattail.db.util import make_topo_sortkey
 
@@ -426,23 +427,34 @@ class FromRattailToRattailExportConsumer(FromRattailToRattailBase):
 
        return self.app.make_session(bind=self.target_engine)
 

	
 
    def process_changes(self, session, changes):
 
        local_session = self.app.make_session()
 
        target_session = self.make_target_session()
 

	
 
        if self.runas_username:
 
            target_session.set_continuum_user(self.runas_username)
 
        try:
 
            if self.runas_username:
 
                target_session.set_continuum_user(self.runas_username)
 

	
 
        # update all importers with current sessions
 
        for importer in self.importers.values():
 
            importer.host_session = session
 
            importer.session = target_session
 
            # update all importers with current sessions
 
            for importer in six.itervalues(self.importers):
 
                importer.host_session = local_session
 
                importer.session = target_session
 

	
 
            # delegate processing to importer(s)
 
            for change in changes:
 
                self.invoke_importer(session, change)
 

	
 
            # TODO: should we ever commit local?
 
            #local_session.commit()
 
            target_session.commit()
 

	
 
        # note that we no longer sort changes here, since the watcher
 
        # should have already done that...
 
        for change in changes:
 
            self.invoke_importer(session, change)
 
        except:
 
            log.exception("failed to process changes")
 
            local_session.rollback()
 
            target_session.rollback()
 

	
 
        target_session.commit()
 
        target_session.close()
 
        finally:
 
            local_session.close()
 
            target_session.close()
 

	
 

	
 
class FromRattailToRattailImportConsumer(FromRattailToRattailBase):
0 comments (0 inline, 0 general)