Changeset - 96b0841536de
[Not reviewed]
0 1 0
Lance Edgar (lance) - 3 years ago 2021-12-08 15:44:40
lance@edbob.org
Add `pre_process_changes()` and `post_process_changes()` for datasync
1 file changed with 30 insertions and 0 deletions:
0 comments (0 inline, 0 general)
rattail/datasync/consumers.py
Show inline comments
 
@@ -50,29 +50,53 @@ class DataSyncConsumer(object):
 

	
 
    def setup(self):
 
        """
 
        This method is called when the consumer thread is first started.
 
        """
 

	
 
    def begin_transaction(self):
 
        """
 
        Called just before the consumer is asked to process changes, possibly
 
        via multiple batches.
 
        """
 

	
 
    def pre_process_changes(self, session, changes):
 
        """
 
        Implement any "pre-processing" logic here.
 

	
 
        Common example would be to establish DB connection to some
 
        external system(s).
 

	
 
        In some rare cases you may also need to "sort" the changes, to
 
        ensure they are processed in a particular sequence.
 

	
 
        :returns: If you needed to modify the list of changes, you can
 
           return the new list.  If this method returns ``None``
 
           (which the default logic does) then the original set of
 
           changes will be processed.
 
        """
 

	
 
    def process_changes(self, session, changes):
 
        """
 
        Process (consume) a batch of changes.
 
        """
 

	
 
    def post_process_changes(self, session, changes):
 
        """
 
        Implement any "post-processing" logic here.
 

	
 
        Common example would be to close DB connection for external
 
        system(s).
 
        """
 

	
 
    def rollback_transaction(self):
 
        """
 
        Called when any batch of changes failed to process.
 
        """
 

	
 
    def commit_transaction(self):
 
        """
 
        Called just after the consumer has successfully finished processing
 
        changes, possibly via multiple batches.
 
        """
 

	
 

	
 
@@ -157,34 +181,40 @@ class NewDataSyncImportConsumer(DataSyncConsumer):
 
            keys = handler.get_default_keys()
 
        else:
 
            keys = handler.get_importer_keys()
 

	
 
        importers = dict([(key, handler.get_importer(key))
 
                          for key in keys])
 
        return importers
 

	
 
    def process_changes(self, session, changes):
 
        """
 
        Process all changes, leveraging importer(s) as much as possible.
 
        """
 
        result = self.pre_process_changes(session, changes)
 
        if result is not None:
 
            changes = result
 

	
 
        if self.runas_username:
 
            session.set_continuum_user(self.runas_username)
 

	
 
        # Update all importers with current Rattail session.
 
        for importer in self.importers.values():
 
            importer.session = session
 

	
 
        for change in changes:
 
            self.invoke_importer(session, change)
 

	
 
        self.post_process_changes(session, changes)
 

	
 
    def invoke_importer(self, session, change):
 
        """
 
        For the given change, invoke the default importer behavior, if one is
 
        available.
 
        """
 
        importer = self.importers.get(change.payload_type)
 
        if importer:
 
            if not change.deletion:
 
                return self.process_change(session, importer, change)
 
            elif importer.allow_delete:
 
                return self.process_deletion(session, importer, change)
 

	
0 comments (0 inline, 0 general)