Changeset - 96b0841536de
[Not reviewed]
0 1 0
Lance Edgar (lance) - 3 years ago 2021-12-08 15:44:40
lance@edbob.org
Add `pre_process_changes()` and `post_process_changes()` for datasync
1 file changed with 30 insertions and 0 deletions:
0 comments (0 inline, 0 general)
rattail/datasync/consumers.py
Show inline comments
 
@@ -38,53 +38,77 @@ class DataSyncConsumer(object):
 
    """
 
    delay = 1 # seconds
 
    retry_attempts = 1
 
    retry_delay = 1 # seconds
 

	
 
    def __init__(self, config, key, dbkey=None, runas=None, watcher=None):
 
        self.config = config
 
        self.key = key
 
        self.dbkey = dbkey
 
        self.runas_username = runas
 
        self.watcher = watcher
 
        self.model = config.get_model()
 

	
 
    def setup(self):
 
        """
 
        This method is called when the consumer thread is first started.
 
        """
 

	
 
    def begin_transaction(self):
 
        """
 
        Called just before the consumer is asked to process changes, possibly
 
        via multiple batches.
 
        """
 

	
 
    def pre_process_changes(self, session, changes):
 
        """
 
        Implement any "pre-processing" logic here.
 

	
 
        Common example would be to establish DB connection to some
 
        external system(s).
 

	
 
        In some rare cases you may also need to "sort" the changes, to
 
        ensure they are processed in a particular sequence.
 

	
 
        :returns: If you needed to modify the list of changes, you can
 
           return the new list.  If this method returns ``None``
 
           (which the default logic does) then the original set of
 
           changes will be processed.
 
        """
 

	
 
    def process_changes(self, session, changes):
 
        """
 
        Process (consume) a batch of changes.
 
        """
 

	
 
    def post_process_changes(self, session, changes):
 
        """
 
        Implement any "post-processing" logic here.
 

	
 
        Common example would be to close DB connection for external
 
        system(s).
 
        """
 

	
 
    def rollback_transaction(self):
 
        """
 
        Called when any batch of changes failed to process.
 
        """
 

	
 
    def commit_transaction(self):
 
        """
 
        Called just after the consumer has successfully finished processing
 
        changes, possibly via multiple batches.
 
        """
 

	
 

	
 
class NewDataSyncImportConsumer(DataSyncConsumer):
 
    """
 
    Base class for DataSync consumer which is able to leverage a (set of)
 
    importer(s) to do the heavy lifting.
 

	
 
    .. note::
 
       This assumes "new-style" importers based on
 
       ``rattail.importing.Importer``.
 

	
 
    .. attribute:: handler_spec
 

	
 
       This should be a "spec" string referencing the import handler class from
 
@@ -145,58 +169,64 @@ class NewDataSyncImportConsumer(DataSyncConsumer):
 
            if local_name in importers:
 
                importers[host_name] = importers[local_name]
 

	
 
        if self.skip_local_models:
 
            for name in list(importers):
 
                if name in self.skip_local_models:
 
                    del importers[name]
 

	
 
        return importers
 

	
 
    def get_importers_from_handler(self, handler, default_only=True):
 
        if default_only:
 
            keys = handler.get_default_keys()
 
        else:
 
            keys = handler.get_importer_keys()
 

	
 
        importers = dict([(key, handler.get_importer(key))
 
                          for key in keys])
 
        return importers
 

	
 
    def process_changes(self, session, changes):
 
        """
 
        Process all changes, leveraging importer(s) as much as possible.
 
        """
 
        result = self.pre_process_changes(session, changes)
 
        if result is not None:
 
            changes = result
 

	
 
        if self.runas_username:
 
            session.set_continuum_user(self.runas_username)
 

	
 
        # Update all importers with current Rattail session.
 
        for importer in self.importers.values():
 
            importer.session = session
 

	
 
        for change in changes:
 
            self.invoke_importer(session, change)
 

	
 
        self.post_process_changes(session, changes)
 

	
 
    def invoke_importer(self, session, change):
 
        """
 
        For the given change, invoke the default importer behavior, if one is
 
        available.
 
        """
 
        importer = self.importers.get(change.payload_type)
 
        if importer:
 
            if not change.deletion:
 
                return self.process_change(session, importer, change)
 
            elif importer.allow_delete:
 
                return self.process_deletion(session, importer, change)
 

	
 
    def process_change(self, session, importer, change=None, host_object=None, host_data=None):
 
        """
 
        Invoke the importer to process the given change / host record.
 
        """
 
        if host_data is None:
 
            if host_object is None:
 
                host_object = self.get_host_object(session, change)
 
                if host_object is None:
 
                    return
 
            host_data = importer.normalize_host_object(host_object)
 
            if host_data is None:
 
                return
0 comments (0 inline, 0 general)