Changeset - 96b0841536de
[Not reviewed]
0 1 0
Lance Edgar (lance) - 3 years ago 2021-12-08 15:44:40
lance@edbob.org
Add `pre_process_changes()` and `post_process_changes()` for datasync
1 file changed with 30 insertions and 0 deletions:
0 comments (0 inline, 0 general)
rattail/datasync/consumers.py
Show inline comments
 
@@ -59,11 +59,35 @@ class DataSyncConsumer(object):
 
        via multiple batches.
 
        """
 

	
 
    def pre_process_changes(self, session, changes):
 
        """
 
        Implement any "pre-processing" logic here.
 

	
 
        Common example would be to establish DB connection to some
 
        external system(s).
 

	
 
        In some rare cases you may also need to "sort" the changes, to
 
        ensure they are processed in a particular sequence.
 

	
 
        :returns: If you needed to modify the list of changes, you can
 
           return the new list.  If this method returns ``None``
 
           (which the default logic does) then the original set of
 
           changes will be processed.
 
        """
 

	
 
    def process_changes(self, session, changes):
 
        """
 
        Process (consume) a batch of changes.
 
        """
 

	
 
    def post_process_changes(self, session, changes):
 
        """
 
        Implement any "post-processing" logic here.
 

	
 
        Common example would be to close DB connection for external
 
        system(s).
 
        """
 

	
 
    def rollback_transaction(self):
 
        """
 
        Called when any batch of changes failed to process.
 
@@ -166,6 +190,10 @@ class NewDataSyncImportConsumer(DataSyncConsumer):
 
        """
 
        Process all changes, leveraging importer(s) as much as possible.
 
        """
 
        result = self.pre_process_changes(session, changes)
 
        if result is not None:
 
            changes = result
 

	
 
        if self.runas_username:
 
            session.set_continuum_user(self.runas_username)
 

	
 
@@ -176,6 +204,8 @@ class NewDataSyncImportConsumer(DataSyncConsumer):
 
        for change in changes:
 
            self.invoke_importer(session, change)
 

	
 
        self.post_process_changes(session, changes)
 

	
 
    def invoke_importer(self, session, change):
 
        """
 
        For the given change, invoke the default importer behavior, if one is
0 comments (0 inline, 0 general)