Changeset - 1e2491a5a9fa
[Not reviewed]
0 3 0
Lance Edgar (lance) - 11 years ago 2013-11-16 01:07:57
lance@edbob.org
More db sync fixes (tested on Windows).
3 files changed with 11 insertions and 9 deletions:
0 comments (0 inline, 0 general)
rattail/db/sync/__init__.py
Show inline comments
 
@@ -82,49 +82,49 @@ def dependency_sort(x, y):
 
            dep_y.append(key)
 

	
 
    if dep_x and not dep_y:
 
        return 1
 
    if dep_y and not dep_x:
 
        return -1
 
    return 0
 

	
 

	
 
class Synchronizer(object):
 
    """
 
    Default implementation of database synchronization logic.  Subclass this if
 
    you have special processing needs.
 
    """
 

	
 
    def __init__(self, local_engine, remote_engines):
 
        self.local_engine = local_engine
 
        self.remote_engines = remote_engines
 

	
 
    def loop(self):
 
        log.info("Synchronizer.loop: using remote engines: {0}".format(
 
                ', '.join(self.remote_engines.iterkeys())))
 
        while True:
 
            try:
 
                self.synchronize_changes()
 
                self.synchronize()
 
            except OperationalError, error:
 
                if error.connection_invalidated:
 
                    # Presumably a database server restart; give it a moment
 
                    # and try again.
 
                    self.sleep(5)
 
                else:
 
                    raise
 
            self.sleep(3)
 

	
 
    def sleep(self, seconds): # pragma no cover
 
        if sys.platform == 'win32':
 
            win32api.Sleep(seconds * 1000)
 
        else:
 
            time.sleep(seconds)
 

	
 
    def synchronize(self):
 
        local_session = Session(bind=self.local_engine)
 
        local_changes = local_session.query(model.Change).all()
 
        log.debug("Synchronizer.synchronize: found {0} changes to synchronize".format(len(local_changes)))
 
        if len(local_changes):
 

	
 
            remote_sessions = {}
 
            for key, remote_engine in self.remote_engines.iteritems():
 
                remote_sessions[key] = Session(bind=remote_engine)
 
@@ -282,30 +282,32 @@ class Synchronizer(object):
 
        # First remove customer associations.
 
        q = session.query(model.CustomerGroupAssignment)\
 
            .filter(model.CustomerGroupAssignment.group == group)
 
        for assignment in q:
 
            session.delete(assignment)
 

	
 

	
 
def synchronize_changes(local_engine, remote_engines):
 
    """
 
    This function will instantiate a ``Synchronizer`` class according to
 
    configuration.  (The default class is :class:`Synchronizer`.)  This
 
    instance is then responsible for implementing the sync logic.
 

	
 
    .. highlight:: ini
 

	
 
    If you need to override the default synchronizer class, put something like
 
    the following in your config file::
 

	
 
       [rattail.db]
 
       sync.synchronizer_class = myapp.sync:MySynchronizer
 
    """
 

	
 
    factory = edbob.config.get('rattail.db', 'sync.synchronizer_class')
 
    if factory:
 
        log.debug("synchronize_changes: using custom synchronizer factory: {0}".format(repr(factory)))
 
        factory = edbob.load_spec(factory)
 
    else:
 
        log.debug("synchronize_changes: using default synchronizer factory")
 
        factory = Synchronizer
 

	
 
    synchronizer = factory(local_engine, remote_engines)
 
    synchronizer.loop()
rattail/db/sync/win32.py
Show inline comments
 
@@ -10,71 +10,71 @@
 
#  Rattail is free software: you can redistribute it and/or modify it under the
 
#  terms of the GNU Affero General Public License as published by the Free
 
#  Software Foundation, either version 3 of the License, or (at your option)
 
#  any later version.
 
#
 
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
 
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
#  FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
 
#  more details.
 
#
 
#  You should have received a copy of the GNU Affero General Public License
 
#  along with Rattail.  If not, see <http://www.gnu.org/licenses/>.
 
#
 
################################################################################
 

	
 
"""
 
``rattail.db.sync.win32`` -- Database Synchronization for Windows
 
"""
 

	
 
import sys
 
import logging
 
import threading
 

	
 
import edbob
 
from edbob.win32 import Service
 
from edbob import db
 

	
 
from rattail.win32.service import Service
 
from rattail.db.sync import get_sync_engines, synchronize_changes
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class DatabaseSynchronizerService(Service):
 
    """
 
    Implements database synchronization as a Windows service.
 
    """
 

	
 
    _svc_name_ = 'RattailDatabaseSynchronizer'
 
    _svc_display_name_ = "Rattail : Database Synchronization Service"
 
    _svc_description_ = ("Monitors the local Rattail database for changes, "
 
                         "and synchronizes them to the configured remote "
 
                         "database(s).")
 

	
 
    appname = 'rattail'
 

	
 
    def Initialize(self):
 
        """
 
        Service initialization.
 
        """
 

	
 
        if not Service.Initialize(self):
 
            return False
 

	
 
        edbob.init_modules(['rattail.db'])
 

	
 
        engines = get_sync_engines()
 
        if not engines:
 
        remote_engines = get_sync_engines()
 
        if not remote_engines:
 
            return False
 

	
 
        thread = threading.Thread(target=synchronize_changes,
 
                                  args=(engines,))
 
                                  args=(db.engine, remote_engines))
 
        thread.daemon = True
 
        thread.start()
 

	
 
        return True
 

	
 

	
 
if __name__ == '__main__':
 
    if sys.platform == 'win32':
 
        import win32serviceutil
 
        win32serviceutil.HandleCommandLine(DatabaseSynchronizerService)
tests/db/sync/test_init.py
Show inline comments
 
@@ -15,54 +15,54 @@ class SynchronizerTests(TestCase):
 

	
 
    def setUp(self):
 
        self.local_engine = create_engine('sqlite://')
 
        self.remote_engines = {
 
            'one': create_engine('sqlite://'),
 
            'two': create_engine('sqlite://'),
 
            }
 
        model.Base.metadata.create_all(bind=self.local_engine)
 
        model.Base.metadata.create_all(bind=self.remote_engines['one'])
 
        model.Base.metadata.create_all(bind=self.remote_engines['two'])
 

	
 
    def test_init(self):
 
        synchronizer = sync.Synchronizer(self.local_engine, self.remote_engines)
 
        self.assertIs(synchronizer.local_engine, self.local_engine)
 
        self.assertIs(synchronizer.remote_engines, self.remote_engines)
 

	
 
    def test_loop(self):
 

	
 
        class FakeOperationalError(OperationalError):
 
            def __init__(self, connection_invalidated):
 
                self.connection_invalidated = connection_invalidated
 

	
 
        synchronizer = sync.Synchronizer(self.local_engine, self.remote_engines)
 
        with patch.object(synchronizer, 'sleep') as sleep:
 
            with patch.object(synchronizer, 'synchronize_changes') as synchronize_changes:
 
            with patch.object(synchronizer, 'synchronize') as synchronize:
 

	
 
                synchronize_changes.side_effect = [1, 2, 3, FakeOperationalError(True),
 
                synchronize.side_effect = [1, 2, 3, FakeOperationalError(True),
 
                                                   5, 6, 7, FakeOperationalError(False)]
 
                self.assertRaises(FakeOperationalError, synchronizer.loop)
 
                self.assertEqual(synchronize_changes.call_count, 8)
 
                self.assertEqual(synchronize.call_count, 8)
 
                self.assertEqual(sleep.call_args_list, [
 
                        call(3), call(3), call(3), call(5), call(3),
 
                        call(3), call(3), call(3)])
 

	
 
    def test_synchronize(self):
 
        synchronizer = sync.Synchronizer(self.local_engine, self.remote_engines)
 

	
 
        with patch.object(synchronizer, 'synchronize_changes') as synchronize_changes:
 

	
 
            # no changes
 
            synchronizer.synchronize()
 
            self.assertFalse(synchronize_changes.called)
 

	
 
            # some changes
 
            local_session = Session(bind=self.local_engine)
 
            product = model.Product()
 
            local_session.add(product)
 
            local_session.flush()
 
            local_session.add(model.Change(class_name='Product', uuid=product.uuid, deleted=False))
 
            product = model.Product()
 
            local_session.add(product)
 
            local_session.flush()
 
            local_session.add(model.Change(class_name='Product', uuid=product.uuid, deleted=False))
 
            local_session.commit()
0 comments (0 inline, 0 general)