Files @ 0d155bc1eace
Branch filter:

Location: rattail-project/rattail/tests/db/test_model.py

lance
Tweak docs.
# -*- coding: utf-8 -*-

from unittest import TestCase
from . import DataTestCase
from mock import patch, DEFAULT, Mock, MagicMock

from sqlalchemy import String, Boolean, Numeric
from sqlalchemy.exc import IntegrityError

from rattail.db import model
from rattail.db.types import GPCType
from rattail.db.changes import record_changes


class SAErrorHelper(object):

    def integrity_or_flush_error(self):
        try:
            from sqlalchemy.exc import FlushError
        except ImportError:
            return IntegrityError
        else:
            return (IntegrityError, FlushError)


class TestPerson(DataTestCase):

    def test_default_display_name_is_generated_from_first_and_last_name_if_both_provided(self):
        person = model.Person(first_name='Fred', last_name='Flintstone')
        self.assertTrue(person.display_name is None)
        self.session.add(person)
        self.session.flush()
        self.assertEqual(person.display_name, 'Fred Flintstone')

    def test_default_display_name_is_generated_from_first_name_if_no_last_name_provided(self):
        person = model.Person(first_name='Fred')
        self.assertTrue(person.display_name is None)
        self.session.add(person)
        self.session.flush()
        self.assertEqual(person.display_name, 'Fred')

    def test_default_display_name_is_generated_from_last_name_if_no_first_name_provided(self):
        person = model.Person(last_name='Flintstone')
        self.assertTrue(person.display_name is None)
        self.session.add(person)
        self.session.flush()
        self.assertEqual(person.display_name, 'Flintstone')


class TestBatch(TestCase):

    @patch('rattail.db.model.object_session')
    def test_rowclass(self, object_session):
        object_session.return_value = object_session

        # no row classes to start with
        self.assertEqual(model.Batch._rowclasses, {})

        # basic, empty row class
        batch = model.Batch(uuid='some_uuid')
        batch.get_sqlalchemy_type = Mock(return_value='some_type')
        batch.columns = MagicMock()
        rowclass = batch.rowclass
        self.assertTrue(issubclass(rowclass, model.BatchRow))
        self.assertEqual(model.Batch._rowclasses.keys(), ['some_uuid'])
        self.assertTrue(model.Batch._rowclasses['some_uuid'] is rowclass)
        self.assertFalse(object_session.flush.called)

        # make sure rowclass.batch works
        object_session.query.return_value.get.return_value = batch
        self.assertTrue(rowclass().batch is batch)
        object_session.query.return_value.get.assert_called_once_with('some_uuid')

        # row class with generated uuid and some columns
        batch = model.Batch(uuid=None)
        batch.columns = [model.BatchColumn(name='F01'), model.BatchColumn(name='F02')]
        model.Batch.get_sqlalchemy_type = Mock(return_value=String(length=20))
        def set_uuid():
            batch.uuid = 'fresh_uuid'
        object_session.flush.side_effect = set_uuid
        rowclass = batch.rowclass
        object_session.flush.assert_called_once_with()
        self.assertEqual(sorted(model.Batch._rowclasses.keys()), sorted(['some_uuid', 'fresh_uuid']))
        self.assertTrue(model.Batch._rowclasses['fresh_uuid'] is rowclass)

    def test_get_sqlalchemy_type(self):

        # gpc
        self.assertTrue(isinstance(model.Batch.get_sqlalchemy_type('GPC(14)'), GPCType))

        # boolean
        self.assertTrue(isinstance(model.Batch.get_sqlalchemy_type('FLAG(1)'), Boolean))

        # string
        type_ = model.Batch.get_sqlalchemy_type('CHAR(20)')
        self.assertTrue(isinstance(type_, String))
        self.assertEqual(type_.length, 20)

        # numeric
        type_ = model.Batch.get_sqlalchemy_type('NUMBER(9,3)')
        self.assertTrue(isinstance(type_, Numeric))
        self.assertEqual(type_.precision, 9)
        self.assertEqual(type_.scale, 3)

        # invalid
        self.assertRaises(AssertionError, model.Batch.get_sqlalchemy_type, 'CHAR(9,3)')
        self.assertRaises(AssertionError, model.Batch.get_sqlalchemy_type, 'OMGWTFBBQ')


class TestCustomer(DataTestCase):

    def test_repr(self):
        customer = model.Customer(uuid='whatever')
        self.assertEqual(repr(customer), "Customer(uuid='whatever')")

    def test_unicode(self):
        customer = model.Customer()
        self.assertEqual(unicode(customer), u'None')
        customer = model.Customer(name='Fred')
        self.assertEqual(unicode(customer), u'Fred')

    def test_cascade_delete_assignment(self):
        customer = model.Customer()
        assignment = model.CustomerGroupAssignment(
            customer=customer, group=model.CustomerGroup(), ordinal=1)
        self.session.add_all([customer, assignment])
        self.session.commit()
        self.assertEqual(self.session.query(model.CustomerGroupAssignment).count(), 1)
        self.session.delete(customer)
        self.session.commit()
        self.assertEqual(self.session.query(model.CustomerGroupAssignment).count(), 0)


class TestCustomerPerson(DataTestCase, SAErrorHelper):

    def test_repr(self):
        assoc = model.CustomerPerson(uuid='whatever')
        self.assertEqual(repr(assoc), "CustomerPerson(uuid='whatever')")

    def test_customer_required(self):
        assoc = model.CustomerPerson(person=model.Person())
        self.session.add(assoc)
        self.assertRaises(self.integrity_or_flush_error(), self.session.commit)
        self.session.rollback()
        self.assertEqual(self.session.query(model.CustomerPerson).count(), 0)
        assoc.customer = model.Customer()
        self.session.add(assoc)
        self.session.commit()
        self.assertEqual(self.session.query(model.CustomerPerson).count(), 1)

    def test_person_required(self):
        assoc = model.CustomerPerson(customer=model.Customer())
        self.session.add(assoc)
        self.assertRaises(IntegrityError, self.session.commit)
        self.session.rollback()
        self.assertEqual(self.session.query(model.CustomerPerson).count(), 0)
        assoc.person = model.Person()
        self.session.add(assoc)
        self.session.commit()
        self.assertEqual(self.session.query(model.CustomerPerson).count(), 1)

    def test_ordinal_autoincrement(self):
        customer = model.Customer()
        self.session.add(customer)
        assoc = model.CustomerPerson(person=model.Person())
        customer._people.append(assoc)
        self.session.commit()
        self.assertEqual(assoc.ordinal, 1)
        assoc = model.CustomerPerson(person=model.Person())
        customer._people.append(assoc)
        self.session.commit()
        self.assertEqual(assoc.ordinal, 2)


class TestCustomerGroupAssignment(DataTestCase, SAErrorHelper):

    def test_repr(self):
        assignment = model.CustomerGroupAssignment(uuid='whatever')
        self.assertEqual(repr(assignment), "CustomerGroupAssignment(uuid='whatever')")

    def test_customer_required(self):
        assignment = model.CustomerGroupAssignment(group=model.CustomerGroup())
        self.session.add(assignment)
        self.assertRaises(self.integrity_or_flush_error(), self.session.commit)
        self.session.rollback()
        self.assertEqual(self.session.query(model.CustomerGroupAssignment).count(), 0)
        assignment.customer = model.Customer()
        self.session.add(assignment)
        self.session.commit()
        self.assertEqual(self.session.query(model.CustomerGroupAssignment).count(), 1)

    def test_group_required(self):
        assignment = model.CustomerGroupAssignment(customer=model.Customer())
        self.session.add(assignment)
        self.assertRaises(IntegrityError, self.session.commit)
        self.session.rollback()
        self.assertEqual(self.session.query(model.CustomerGroupAssignment).count(), 0)
        assignment.group = model.CustomerGroup()
        self.session.add(assignment)
        self.session.commit()
        self.assertEqual(self.session.query(model.CustomerGroupAssignment).count(), 1)

    def test_ordinal_autoincrement(self):
        customer = model.Customer()
        self.session.add(customer)
        assignment = model.CustomerGroupAssignment(group=model.CustomerGroup())
        customer._groups.append(assignment)
        self.session.commit()
        self.assertEqual(assignment.ordinal, 1)
        assignment = model.CustomerGroupAssignment(group=model.CustomerGroup())
        customer._groups.append(assignment)
        self.session.commit()
        self.assertEqual(assignment.ordinal, 2)


class TestCustomerEmailAddress(DataTestCase):

    def test_pop(self):
        customer = model.Customer()
        customer.add_email_address('fred.home@mailinator.com')
        customer.add_email_address('fred.work@mailinator.com')
        self.session.add(customer)
        self.session.commit()
        self.assertEqual(self.session.query(model.Customer).count(), 1)
        self.assertEqual(self.session.query(model.CustomerEmailAddress).count(), 2)

        while customer.emails:
            customer.emails.pop()
        self.session.commit()
        self.assertEqual(self.session.query(model.Customer).count(), 1)
        self.assertEqual(self.session.query(model.CustomerEmailAddress).count(), 0)

        # changes weren't being recorded
        self.assertEqual(self.session.query(model.Change).count(), 0)

    def test_pop_with_changes(self):
        record_changes(self.session)

        customer = model.Customer()
        customer.add_email_address('fred.home@mailinator.com')
        customer.add_email_address('fred.work@mailinator.com')
        self.session.add(customer)
        self.session.commit()
        self.assertEqual(self.session.query(model.Customer).count(), 1)
        self.assertEqual(self.session.query(model.CustomerEmailAddress).count(), 2)

        while customer.emails:
            customer.emails.pop()
        self.session.commit()
        self.assertEqual(self.session.query(model.Customer).count(), 1)
        self.assertEqual(self.session.query(model.CustomerEmailAddress).count(), 0)

        # changes should have been recorded
        changes = self.session.query(model.Change)
        self.assertEqual(changes.count(), 3)

        customer_change = changes.filter_by(class_name='Customer').one()
        self.assertEqual(customer_change.uuid, customer.uuid)
        self.assertFalse(customer_change.deleted)

        email_changes = changes.filter_by(class_name='CustomerEmailAddress')
        self.assertEqual(email_changes.count(), 2)
        self.assertEqual([x.deleted for x in email_changes], [True, True])


class TestLabelProfile(DataTestCase):

    def test_get_printer_setting(self):
        profile = model.LabelProfile()
        self.session.add(profile)

        self.assertTrue(profile.uuid is None)
        setting = profile.get_printer_setting('some_setting')
        self.assertTrue(setting is None)
        self.assertTrue(profile.uuid is None)

        profile.uuid = 'some_uuid'
        self.session.add(model.Setting(
                name='labels.some_uuid.printer.some_setting',
                value='some_value'))
        self.session.flush()
        setting = profile.get_printer_setting('some_setting')
        self.assertEquals(setting, 'some_value')

    def test_save_printer_setting(self):
        self.assertEqual(self.session.query(model.Setting).count(), 0)
        profile = model.LabelProfile()
        self.session.add(profile)

        self.assertTrue(profile.uuid is None)
        profile.save_printer_setting('some_setting', 'some_value')
        self.assertFalse(profile.uuid is None)
        self.assertEqual(self.session.query(model.Setting).count(), 1)

        profile.uuid = 'some_uuid'
        profile.save_printer_setting('some_setting', 'some_value')
        self.assertEqual(self.session.query(model.Setting).count(), 2)
        setting = self.session.query(model.Setting)\
            .filter_by(name='labels.some_uuid.printer.some_setting')\
            .one()
        self.assertEqual(setting.value, 'some_value')