Changeset - 1e2491a5a9fa
[Not reviewed]
0 3 0
Lance Edgar (lance) - 11 years ago 2013-11-16 01:07:57
lance@edbob.org
More db sync fixes (tested on Windows).
3 files changed with 11 insertions and 9 deletions:
0 comments (0 inline, 0 general)
rattail/db/sync/__init__.py
Show inline comments
 
@@ -10,193 +10,193 @@
 
#  Rattail is free software: you can redistribute it and/or modify it under the
 
#  terms of the GNU Affero General Public License as published by the Free
 
#  Software Foundation, either version 3 of the License, or (at your option)
 
#  any later version.
 
#
 
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
 
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
#  FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
 
#  more details.
 
#
 
#  You should have received a copy of the GNU Affero General Public License
 
#  along with Rattail.  If not, see <http://www.gnu.org/licenses/>.
 
#
 
################################################################################
 

	
 
"""
 
``rattail.db.sync`` -- Database Synchronization
 
"""
 

	
 
import sys
 
import time
 
import logging
 

	
 
if sys.platform == 'win32': # pragma no cover
 
    import win32api
 

	
 
import sqlalchemy.exc
 
from sqlalchemy.orm import class_mapper
 
from sqlalchemy.exc import OperationalError
 

	
 
import edbob
 

	
 
from rattail.db import Session
 
from rattail.db import model
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def get_sync_engines():
 
    edbob.init_modules(['edbob.db'])
 

	
 
    keys = edbob.config.get('rattail.db', 'syncs')
 
    if not keys:
 
        return None
 

	
 
    engines = {}
 
    for key in keys.split(','):
 
        key = key.strip()
 
        engines[key] = edbob.engines[key]
 
    log.debug("get_sync_engines: Found engine keys: %s" % ','.join(engines.keys()))
 
    return engines
 

	
 

	
 
def dependency_sort(x, y):
 
    map_x = class_mapper(getattr(model, x))
 
    map_y = class_mapper(getattr(model, y))
 

	
 
    dep_x = []
 
    table_y = map_y.tables[0].name
 
    for column in map_x.columns:
 
        for key in column.foreign_keys:
 
            if key.column.table.name == table_y:
 
                return 1
 
            dep_x.append(key)
 

	
 
    dep_y = []
 
    table_x = map_x.tables[0].name
 
    for column in map_y.columns:
 
        for key in column.foreign_keys:
 
            if key.column.table.name == table_x:
 
                return -1
 
            dep_y.append(key)
 

	
 
    if dep_x and not dep_y:
 
        return 1
 
    if dep_y and not dep_x:
 
        return -1
 
    return 0
 

	
 

	
 
class Synchronizer(object):
 
    """
 
    Default implementation of database synchronization logic.  Subclass this if
 
    you have special processing needs.
 
    """
 

	
 
    def __init__(self, local_engine, remote_engines):
 
        self.local_engine = local_engine
 
        self.remote_engines = remote_engines
 

	
 
    def loop(self):
 
        log.info("Synchronizer.loop: using remote engines: {0}".format(
 
                ', '.join(self.remote_engines.iterkeys())))
 
        while True:
 
            try:
 
                self.synchronize_changes()
 
                self.synchronize()
 
            except OperationalError, error:
 
                if error.connection_invalidated:
 
                    # Presumably a database server restart; give it a moment
 
                    # and try again.
 
                    self.sleep(5)
 
                else:
 
                    raise
 
            self.sleep(3)
 

	
 
    def sleep(self, seconds): # pragma no cover
 
        if sys.platform == 'win32':
 
            win32api.Sleep(seconds * 1000)
 
        else:
 
            time.sleep(seconds)
 

	
 
    def synchronize(self):
 
        local_session = Session(bind=self.local_engine)
 
        local_changes = local_session.query(model.Change).all()
 
        log.debug("Synchronizer.synchronize: found {0} changes to synchronize".format(len(local_changes)))
 
        if len(local_changes):
 

	
 
            remote_sessions = {}
 
            for key, remote_engine in self.remote_engines.iteritems():
 
                remote_sessions[key] = Session(bind=remote_engine)
 

	
 
            self.synchronize_changes(local_changes, local_session, remote_sessions)
 

	
 
            for remote_session in remote_sessions.itervalues():
 
                remote_session.commit()
 
                remote_session.close()
 
            local_session.commit()
 

	
 
            log.debug("Synchronizer.synchronize: synchronization complete")
 
        local_session.close()
 

	
 
    def synchronize_changes(self, local_changes, local_session, remote_sessions):
 

	
 
        # First we must determine which types of instances are in need of
 
        # syncing.  The order will matter because of class dependencies.
 
        # However the dependency_sort() call doesn't *quite* take care of
 
        # everything - notably the Product/ProductPrice situation.  Since those
 
        # classes are mutually dependent, we start with a hackish lexical sort
 
        # and hope for the best...
 
        class_names = sorted(set([x.class_name for x in local_changes]))
 
        class_names.sort(cmp=dependency_sort)
 

	
 
        for class_name in class_names:
 
            klass = getattr(model, class_name)
 

	
 
            for change in [x for x in local_changes if x.class_name == class_name]:
 
                log.debug("Synchronizer.synchronize_changes: processing change: {0}".format(repr(change)))
 

	
 
                if change.deleted:
 
                    for remote_session in remote_sessions.itervalues():
 
                        remote_instance = remote_session.query(klass).get(change.uuid)
 
                        if remote_instance:
 
                            self.delete_instance(remote_session, remote_instance)
 
                            remote_session.flush()
 

	
 
                else: # new/dirty
 
                    local_instance = local_session.query(klass).get(change.uuid)
 
                    if local_instance:
 
                        for remote_session in remote_sessions.itervalues():
 
                            self.merge_instance(remote_session, local_instance)
 
                            remote_session.flush()
 

	
 
                local_session.delete(change)
 
                local_session.flush()
 

	
 
    def merge_instance(self, session, instance):
 
        """
 
        Merge ``instance`` into ``session``.
 

	
 
        This method checks for other "special" methods based on the class of
 
        ``instance``.  If such a method is found, it is invoked; otherwise a
 
        simple merge is done.
 
        """
 

	
 
        cls = instance.__class__.__name__
 
        if hasattr(self, 'merge_%s' % cls):
 
            return getattr(self, 'merge_%s' % cls)(session, instance)
 

	
 
        return session.merge(instance)
 

	
 
    def merge_Product(self, session, source_product):
 
        """
 
        This method is somewhat of a hack, in order to properly handle
 
        :class:`rattail.db.model.Product` instances and the interdependent
 
        nature of the related :class:`rattail.db.model.ProductPrice` instances.
 
        """
 

	
 
        target_product = session.merge(source_product)
 

	
 
        # I'm not 100% sure I understand this correctly, but here's my
 
        # thinking: First we clear the price relationships in case they've
 
        # actually gone away; then we re-establish any which are currently
 
@@ -210,102 +210,104 @@ class Synchronizer(object):
 

	
 
        # If the source instance has currently valid price relationships, then
 
        # we re-establish them.  We must merge the source price instance in
 
        # order to be certain it will exist in the target session, and avoid
 
        # foreign key errors.  However we *still* must also set the UUID fields
 
        # because again, the ORM is watching those...  This was noticed to be
 
        # the source of some bugs where successive database syncs were
 
        # effectively "toggling" the price relationship.  Setting the UUID
 
        # field explicitly seems to solve it.
 
        if source_product.regular_price_uuid:
 
            target_product.regular_price = session.merge(source_product.regular_price)
 
            target_product.regular_price_uuid = target_product.regular_price.uuid
 
        if source_product.current_price_uuid:
 
            target_product.current_price = session.merge(source_product.current_price)
 
            target_product.current_price_uuid = target_product.current_price.uuid
 

	
 
        return target_product
 

	
 
    def delete_instance(self, session, instance):
 
        """
 
        Delete ``instance`` using ``session``.
 

	
 
        This method checks for other "special" methods based on the class of
 
        ``instance``.  If such a method is found, it is invoked before the
 
        instance is officially deleted from ``session``.
 
        """
 

	
 
        cls = instance.__class__.__name__
 
        if hasattr(self, 'delete_%s' % cls):
 
            getattr(self, 'delete_%s' % cls)(session, instance)
 

	
 
        session.delete(instance)
 

	
 
    def delete_Department(self, session, department):
 

	
 
        # Remove association from Subdepartment records.
 
        q = session.query(model.Subdepartment)
 
        q = q.filter(model.Subdepartment.department == department)
 
        for subdepartment in q:
 
            subdepartment.department = None
 

	
 
        # Remove association from Product records.
 
        q = session.query(model.Product)
 
        q = q.filter(model.Product.department == department)
 
        for product in q:
 
            product.department = None
 

	
 
    def delete_Subdepartment(self, session, subdepartment):
 

	
 
        # Remove association from Product records.
 
        q = session.query(model.Product)
 
        q = q.filter(model.Product.subdepartment == subdepartment)
 
        for product in q:
 
            product.subdepartment = None
 

	
 
    def delete_Family(self, session, family):
 

	
 
        # Remove Product associations.
 
        products = session.query(model.Product)\
 
            .filter(model.Product.family == family)
 
        for product in products:
 
            product.family = None
 

	
 
    def delete_Vendor(self, session, vendor):
 

	
 
        # Remove associated ProductCost records.
 
        q = session.query(model.ProductCost)
 
        q = q.filter(model.ProductCost.vendor == vendor)
 
        for cost in q:
 
            session.delete(cost)
 

	
 
    def delete_CustomerGroup(self, session, group):
 
        # First remove customer associations.
 
        q = session.query(model.CustomerGroupAssignment)\
 
            .filter(model.CustomerGroupAssignment.group == group)
 
        for assignment in q:
 
            session.delete(assignment)
 

	
 

	
 
def synchronize_changes(local_engine, remote_engines):
 
    """
 
    This function will instantiate a ``Synchronizer`` class according to
 
    configuration.  (The default class is :class:`Synchronizer`.)  This
 
    instance is then responsible for implementing the sync logic.
 

	
 
    .. highlight:: ini
 

	
 
    If you need to override the default synchronizer class, put something like
 
    the following in your config file::
 

	
 
       [rattail.db]
 
       sync.synchronizer_class = myapp.sync:MySynchronizer
 
    """
 

	
 
    factory = edbob.config.get('rattail.db', 'sync.synchronizer_class')
 
    if factory:
 
        log.debug("synchronize_changes: using custom synchronizer factory: {0}".format(repr(factory)))
 
        factory = edbob.load_spec(factory)
 
    else:
 
        log.debug("synchronize_changes: using default synchronizer factory")
 
        factory = Synchronizer
 

	
 
    synchronizer = factory(local_engine, remote_engines)
 
    synchronizer.loop()
rattail/db/sync/win32.py
Show inline comments
 
#!/usr/bin/env python
 
# -*- coding: utf-8  -*-
 
################################################################################
 
#
 
#  Rattail -- Retail Software Framework
 
#  Copyright © 2010-2012 Lance Edgar
 
#
 
#  This file is part of Rattail.
 
#
 
#  Rattail is free software: you can redistribute it and/or modify it under the
 
#  terms of the GNU Affero General Public License as published by the Free
 
#  Software Foundation, either version 3 of the License, or (at your option)
 
#  any later version.
 
#
 
#  Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
 
#  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
 
#  FOR A PARTICULAR PURPOSE.  See the GNU Affero General Public License for
 
#  more details.
 
#
 
#  You should have received a copy of the GNU Affero General Public License
 
#  along with Rattail.  If not, see <http://www.gnu.org/licenses/>.
 
#
 
################################################################################
 

	
 
"""
 
``rattail.db.sync.win32`` -- Database Synchronization for Windows
 
"""
 

	
 
import sys
 
import logging
 
import threading
 

	
 
import edbob
 
from edbob.win32 import Service
 
from edbob import db
 

	
 
from rattail.win32.service import Service
 
from rattail.db.sync import get_sync_engines, synchronize_changes
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class DatabaseSynchronizerService(Service):
 
    """
 
    Implements database synchronization as a Windows service.
 
    """
 

	
 
    _svc_name_ = 'RattailDatabaseSynchronizer'
 
    _svc_display_name_ = "Rattail : Database Synchronization Service"
 
    _svc_description_ = ("Monitors the local Rattail database for changes, "
 
                         "and synchronizes them to the configured remote "
 
                         "database(s).")
 

	
 
    appname = 'rattail'
 

	
 
    def Initialize(self):
 
        """
 
        Service initialization.
 
        """
 

	
 
        if not Service.Initialize(self):
 
            return False
 

	
 
        edbob.init_modules(['rattail.db'])
 

	
 
        engines = get_sync_engines()
 
        if not engines:
 
        remote_engines = get_sync_engines()
 
        if not remote_engines:
 
            return False
 

	
 
        thread = threading.Thread(target=synchronize_changes,
 
                                  args=(engines,))
 
                                  args=(db.engine, remote_engines))
 
        thread.daemon = True
 
        thread.start()
 

	
 
        return True
 

	
 

	
 
if __name__ == '__main__':
 
    if sys.platform == 'win32':
 
        import win32serviceutil
 
        win32serviceutil.HandleCommandLine(DatabaseSynchronizerService)
tests/db/sync/test_init.py
Show inline comments
 

	
 
import warnings
 
from unittest import TestCase
 
from mock import patch, call, Mock, DEFAULT
 

	
 
from sqlalchemy import create_engine
 
from sqlalchemy.exc import OperationalError, SAWarning
 

	
 
from rattail.db import sync
 
from rattail.db import Session
 
from rattail.db import model
 

	
 

	
 
class SynchronizerTests(TestCase):
 

	
 
    def setUp(self):
 
        self.local_engine = create_engine('sqlite://')
 
        self.remote_engines = {
 
            'one': create_engine('sqlite://'),
 
            'two': create_engine('sqlite://'),
 
            }
 
        model.Base.metadata.create_all(bind=self.local_engine)
 
        model.Base.metadata.create_all(bind=self.remote_engines['one'])
 
        model.Base.metadata.create_all(bind=self.remote_engines['two'])
 

	
 
    def test_init(self):
 
        synchronizer = sync.Synchronizer(self.local_engine, self.remote_engines)
 
        self.assertIs(synchronizer.local_engine, self.local_engine)
 
        self.assertIs(synchronizer.remote_engines, self.remote_engines)
 

	
 
    def test_loop(self):
 

	
 
        class FakeOperationalError(OperationalError):
 
            def __init__(self, connection_invalidated):
 
                self.connection_invalidated = connection_invalidated
 

	
 
        synchronizer = sync.Synchronizer(self.local_engine, self.remote_engines)
 
        with patch.object(synchronizer, 'sleep') as sleep:
 
            with patch.object(synchronizer, 'synchronize_changes') as synchronize_changes:
 
            with patch.object(synchronizer, 'synchronize') as synchronize:
 

	
 
                synchronize_changes.side_effect = [1, 2, 3, FakeOperationalError(True),
 
                synchronize.side_effect = [1, 2, 3, FakeOperationalError(True),
 
                                                   5, 6, 7, FakeOperationalError(False)]
 
                self.assertRaises(FakeOperationalError, synchronizer.loop)
 
                self.assertEqual(synchronize_changes.call_count, 8)
 
                self.assertEqual(synchronize.call_count, 8)
 
                self.assertEqual(sleep.call_args_list, [
 
                        call(3), call(3), call(3), call(5), call(3),
 
                        call(3), call(3), call(3)])
 

	
 
    def test_synchronize(self):
 
        synchronizer = sync.Synchronizer(self.local_engine, self.remote_engines)
 

	
 
        with patch.object(synchronizer, 'synchronize_changes') as synchronize_changes:
 

	
 
            # no changes
 
            synchronizer.synchronize()
 
            self.assertFalse(synchronize_changes.called)
 

	
 
            # some changes
 
            local_session = Session(bind=self.local_engine)
 
            product = model.Product()
 
            local_session.add(product)
 
            local_session.flush()
 
            local_session.add(model.Change(class_name='Product', uuid=product.uuid, deleted=False))
 
            product = model.Product()
 
            local_session.add(product)
 
            local_session.flush()
 
            local_session.add(model.Change(class_name='Product', uuid=product.uuid, deleted=False))
 
            local_session.commit()
 
            synchronizer.synchronize()
 
            self.assertEqual(synchronize_changes.call_count, 1)
 
            # call_args is a tuple of (args, kwargs) - first element of args should be our 2 changes
 
            self.assertEqual(len(synchronize_changes.call_args[0][0]), 2)
 
            self.assertIsInstance(synchronize_changes.call_args[0][0][0], model.Change)
 

	
 
    def test_synchronize_changes(self):
 
        synchronizer = sync.Synchronizer(self.local_engine, self.remote_engines)
 

	
 
        local_session = Session(bind=self.local_engine)
 
        remote_sessions = {
 
            'one': Session(bind=self.remote_engines['one']),
 
            'two': Session(bind=self.remote_engines['two']),
 
            }
 

	
 
        # no changes; nothing should happen but make sure nothing blows up also
 
        local_changes = []
 
        synchronizer.synchronize_changes(local_changes, local_session, remote_sessions)
 

	
 
        # add a product, with change
 
        product = model.Product()
 
        local_session.add(product)
 
        local_session.flush()
 
        change = model.Change(class_name='Product', uuid=product.uuid, deleted=False)
 
        local_session.add(change)
 
        local_session.flush()
 
        self.assertEqual(local_session.query(model.Product).count(), 1)
 
        self.assertEqual(local_session.query(model.Change).count(), 1)
 

	
 
        # remote sessions don't have the product yet
 
        self.assertEqual(remote_sessions['one'].query(model.Product).count(), 0)
 
        self.assertEqual(remote_sessions['two'].query(model.Product).count(), 0)
 

	
 
        # sync the change
 
        synchronizer.synchronize_changes([change], local_session, remote_sessions)
 
        self.assertEqual(local_session.query(model.Product).count(), 1)
 
        self.assertEqual(local_session.query(model.Change).count(), 0)
 

	
 
        # remote session 'one' has the product
 
        self.assertEqual(remote_sessions['one'].query(model.Product).count(), 1)
 
        remote_product_1 = remote_sessions['one'].query(model.Product).one()
 
        self.assertEqual(remote_product_1.uuid, product.uuid)
 

	
 
        # remote session 'two' has the product
 
        self.assertEqual(remote_sessions['two'].query(model.Product).count(), 1)
 
        remote_product_2 = remote_sessions['two'].query(model.Product).one()
 
        self.assertEqual(remote_product_2.uuid, product.uuid)
 

	
 
        # delete the product (new change)
 
        local_session.delete(product)
 
        change = model.Change(class_name='Product', uuid=product.uuid, deleted=True)
 
        local_session.add(change)
 
        local_session.flush()
 
        self.assertEqual(local_session.query(model.Product).count(), 0)
 
        self.assertEqual(local_session.query(model.Change).count(), 1)
 

	
 
        # sync the change
 
        synchronizer.synchronize_changes([change], local_session, remote_sessions)
 
        self.assertEqual(local_session.query(model.Change).count(), 0)
 

	
 
        # remote sessions no longer have the product
 
        self.assertEqual(remote_sessions['one'].query(model.Product).count(), 0)
 
        self.assertEqual(remote_sessions['two'].query(model.Product).count(), 0)
 

	
 
    def test_merge_instance(self):
 

	
 
        class FakeClass(object):
 
            pass
 

	
 
        synchronizer = sync.Synchronizer(self.local_engine, self.remote_engines)
 
        session = Mock()
 
        instance = FakeClass()
0 comments (0 inline, 0 general)