# -*- coding: utf-8; -*- ################################################################################ # # Rattail -- Retail Software Framework # Copyright © 2010-2021 Lance Edgar # # This file is part of Rattail. # # Rattail is free software: you can redistribute it and/or modify it under the # terms of the GNU General Public License as published by the Free Software # Foundation, either version 3 of the License, or (at your option) any later # version. # # Rattail is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # Rattail. If not, see . # ################################################################################ """ Handler for "customer order" batches """ from __future__ import unicode_literals, absolute_import, division import re import six import sqlalchemy as sa from sqlalchemy import orm from rattail.db import model from rattail.batch import BatchHandler class CustomerOrderBatchHandler(BatchHandler): """ Handler for all "customer order" batches, regardless of "mode". The handler must inspect the :attr:`~rattail.db.model.batch.custorder.CustomerOrderBatch.mode` attribute of each batch it deals with, in order to determine which logic to apply. .. attribute:: has_custom_product_autocomplete If true, this flag indicates that the handler provides custom autocomplete logic for use when selecting a product while creating a new order. """ batch_model_class = model.CustomerOrderBatch has_custom_product_autocomplete = False nondigits_pattern = re.compile(r'\D') def init_batch(self, batch, progress=None, **kwargs): """ Assign the "local" store to the batch, if applicable. """ session = self.app.get_session(batch) batch.store = self.config.get_store(session) def new_order_requires_customer(self): """ Returns a boolean indicating whether a *new* "customer order" in fact requires a proper customer account, or not. Note that in all cases a new order requires a *person* to associate with, but technically the customer is optional, unless this returns true. """ return self.config.getbool('rattail.custorders', 'new_order_requires_customer', default=False) def allow_contact_info_choice(self): """ Returns a boolean indicating whether the user is allowed at all, to choose from existing contact info options for the customer, vs. they just have to go with whatever the handler auto-provides. """ return self.config.getbool('rattail.custorders', 'new_orders.allow_contact_info_choice', default=True) def should_restrict_contact_info(self): """ Returns a boolean indicating whether contact info should be "restricted" - i.e. user can only choose from existing contact info and cannot override by e.g. entering a new phone number. """ return self.config.getbool('rattail.custorders', 'new_orders.restrict_contact_info', default=False) def assign_contact(self, batch, customer=None, person=None, **kwargs): """ Assign the customer and/or person "contact" for the order. """ clientele = self.app.get_clientele_handler() customer_required = self.new_order_requires_customer() # nb. person is always required if customer and not person: person = clientele.get_person(customer) if not person: raise ValueError("Must specify a person") # customer may or may not be optional if person and not customer: customer = clientele.get_customer(person) if customer_required and not customer: raise ValueError("Must specify a customer account") # assign contact batch.customer = customer batch.person = person # cache contact name batch.contact_name = self.get_contact_display(batch) # update phone/email per new contact batch.phone_number = None batch.email_address = None if customer_required: batch.phone_number = clientele.get_first_phone_number(customer) batch.email_address = clientele.get_first_email_address(customer) else: batch.phone_number = person.first_phone_number() batch.email_address = person.first_email_address() # always reset "add to customer" flags batch.clear_param('add_phone_number') batch.clear_param('add_email_address') session = self.app.get_session(batch) session.flush() def get_contact(self, batch): """ Should return the contact record (i.e. Customer or Person) for the batch. """ customer_required = self.new_order_requires_customer() if customer_required: return batch.customer else: return batch.person def get_contact_id(self, batch): """ Should return contact ID for the batch, i.e. customer ID. """ contact = self.get_contact(batch) if isinstance(contact, model.Customer): return contact.id def get_contact_display(self, batch): """ Should return contact display text for the batch, i.e. customer name. """ contact = self.get_contact(batch) if contact: return six.text_type(contact) pending = batch.pending_customer if pending: return six.text_type(pending) def get_contact_phones(self, batch): """ Retrieve all phone records on file for the batch contact, to be presented as options for user to choose from when making a new order. """ phones = [] contact = self.get_contact(batch) if contact: phones = contact.phones return [self.normalize_phone(phone) for phone in phones] def normalize_phone(self, phone): """ Normalize the given phone record to simple data dict, for passing around via JSON etc. """ return { 'uuid': phone.uuid, 'type': phone.type, 'number': phone.number, 'preference': phone.preference, 'preferred': phone.preference == 1, } def get_contact_emails(self, batch): """ Retrieve all email records on file for the batch contact, to be presented as options for user to choose from when making a new order. Note that the default logic will exclude invalid email addresses. """ emails = [] contact = self.get_contact(batch) if contact: emails = contact.emails # exclude invalid emails = [email for email in emails if not email.invalid] return [self.normalize_email(email) for email in emails] def normalize_email(self, email): """ Normalize the given email record to simple data dict, for passing around via JSON etc. """ return { 'uuid': email.uuid, 'type': email.type, 'address': email.address, 'invalid': email.invalid, 'preference': email.preference, 'preferred': email.preference == 1, } def get_contact_notes(self, batch): """ Get extra "contact notes" which should be made visible to the user who is entering the new order. """ notes = [] invalid = False contact = self.get_contact(batch) if contact: invalid = [email for email in contact.emails if email.invalid] if invalid: notes.append("Customer has one or more invalid email addresses on file.") return notes def unassign_contact(self, batch, **kwargs): """ Unassign the customer and/or person "contact" for the order. """ batch.customer = None batch.person = None # note that if batch already has a "pending" customer on file, # we will "restore" it as the contact info for the batch pending = batch.pending_customer if pending: batch.contact_name = pending.display_name batch.phone_number = pending.phone_number batch.email_address = pending.email_address else: batch.contact_name = None batch.phone_number = None batch.email_address = None # always reset "add to customer" flags batch.clear_param('add_phone_number') batch.clear_param('add_email_address') session = self.app.get_session(batch) session.flush() session.refresh(batch) def validate_pending_customer_data(self, batch, user, data): pass def update_pending_customer(self, batch, user, data): model = self.model people = self.app.get_people_handler() # first validate all data self.validate_pending_customer_data(batch, user, data) # clear out any contact it may have self.unassign_contact(batch) # create pending customer if needed pending = batch.pending_customer if not pending: pending = model.PendingCustomer() pending.user = user pending.status_code = self.enum.PENDING_CUSTOMER_STATUS_PENDING batch.pending_customer = pending # update pending customer info if 'first_name' in data: pending.first_name = data['first_name'] if 'last_name' in data: pending.last_name = data['last_name'] if 'display_name' in data: pending.display_name = data['display_name'] else: pending.display_name = people.normalize_full_name(pending.first_name, pending.last_name) if 'phone_number' in data: pending.phone_number = self.app.format_phone_number(data['phone_number']) if 'email_address' in data: pending.email_address = data['email_address'] # also update the batch w/ contact info batch.contact_name = pending.display_name batch.phone_number = pending.phone_number batch.email_address = pending.email_address def get_case_size_for_product(self, product): if product.case_size: return product.case_size cost = product.cost if cost: return cost.case_size # TODO: this method should maybe not exist? and caller just # invokes the handler directly instead? def customer_autocomplete(self, session, term, **kwargs): """ Override the Customer autocomplete, to search by phone number as well as name. """ autocompleter = self.app.get_autocompleter('customers.neworder') return autocompleter.autocomplete(session, term, **kwargs) # TODO: this method should maybe not exist? and caller just # invokes the handler directly instead? def person_autocomplete(self, session, term, **kwargs): """ Override the Person autocomplete, to search by phone number as well as name. """ autocompleter = self.app.get_autocompleter('people.neworder') return autocompleter.autocomplete(session, term, **kwargs) def get_customer_info(self, batch, **kwargs): """ Return a data dict containing misc. info pertaining to the customer/person for the order batch. """ info = { 'customer_uuid': None, 'person_uuid': None, 'phone_number': None, 'email_address': None, } if batch.customer: info['customer_uuid'] = batch.customer.uuid phone = batch.customer.first_phone() if phone: info['phone_number'] = phone.number email = batch.customer.first_email() if email: info['email_address'] = email.address if batch.person: info['person_uuid'] = batch.person.uuid if not info['phone_number']: phone = batch.person.first_phone() if phone: info['phone_number'] = phone.number email = batch.person.first_email() if email: info['email_address'] = email.address return info def custom_product_autocomplete(self, session, term, **kwargs): """ For the given term, this should return a (possibly empty) list of products which "match" the term. Each element in the list should be a dict with "label" and "value" keys. """ raise NotImplementedError("Please define the " "{}.custom_product_autocomplete() " "method.".format(__class__.__name__)) def why_not_add_product(self, product, batch): """ This method can inspect the given product, and batch, to determine if the product may be added to the batch as a new row. Useful to e.g. prevent one customer from ordering too many things, etc. :returns: If there is a reason not to add the product, should return that reason as a string; otherwise ``None``. """ def refresh_row(self, row): if not row.product: if row.item_entry: session = orm.object_session(row) # TODO: should do more than just query for uuid here product = session.query(model.Product).get(row.item_entry) if product: row.product = product if not row.product: row.status_code = row.STATUS_PRODUCT_NOT_FOUND return product = row.product row.product_upc = product.upc row.product_brand = six.text_type(product.brand or "") row.product_description = product.description row.product_size = product.size row.product_weighed = product.weighed row.case_quantity = self.get_case_size_for_product(product) department = product.department row.department_number = department.number if department else None row.department_name = department.name if department else None cost = product.cost row.product_unit_cost = cost.unit_cost if cost else None regprice = product.regular_price row.unit_price = regprice.price if regprice else None # we need to know if total price is updated old_total = row.total_price # maybe update total price if row.unit_price is None: row.total_price = None elif not row.unit_price: row.total_price = 0 else: row.total_price = row.unit_price * row.order_quantity if row.order_uom == self.enum.UNIT_OF_MEASURE_CASE: row.total_price *= (row.case_quantity or 1) # update total price for batch too, if it changed if row.total_price != old_total: batch = row.batch batch.total_price = ((batch.total_price or 0) + (row.total_price or 0) - (old_total or 0)) row.status_code = row.STATUS_OK def remove_row(self, row): batch = row.batch if not row.removed: row.removed = True if row.total_price: batch.total_price = (batch.total_price or 0) - row.total_price self.refresh_batch_status(batch) def execute(self, batch, user=None, progress=None, **kwargs): """ Default behavior here will create and return a new rattail Customer Order. It also may "add contact info" e.g. to the customer record. Override as needed. """ order = self.make_new_order(batch, user=user, progress=progress, **kwargs) self.update_contact_info(batch, user) return order def update_contact_info(self, batch, user, **kwargs): """ Update contact info from the batch, onto the customer record. """ if batch.get_param('add_phone_number'): self.add_phone_number(batch, user) if batch.get_param('add_email_address'): self.add_email_address(batch, user) def add_phone_number(self, batch, user, **kwargs): """ Add phone number from the batch to the customer record. Note that the default behavior does *not* do that, but instead will send an email alert to configured recipient(s) with the update request. """ self.app.send_email('new_phone_requested', { 'user': user, 'user_display': user.display_name if user else "(unknown user)", 'contact': self.get_contact(batch), 'contact_id': self.get_contact_id(batch), 'phone_number': batch.phone_number, }) def add_email_address(self, batch, user, **kwargs): """ Add email address from the batch to the customer record. Note that the default behavior does *not* do that, but instead will send an email alert to configured recipient(s) with the update request. """ self.app.send_email('new_email_requested', { 'user': user, 'user_display': user.display_name if user else "(unknown user)", 'contact': self.get_contact(batch), 'contact_id': self.get_contact_id(batch), 'email_address': batch.email_address, }) def make_new_order(self, batch, user=None, progress=None, **kwargs): """ Create and return a new rattail Customer Order based on the batch contents. """ batch_fields = [ 'store', 'id', 'customer', 'person', 'pending_customer', 'phone_number', 'email_address', 'total_price', ] order = model.CustomerOrder() order.created_by = user order.status_code = self.enum.CUSTORDER_STATUS_ORDERED for field in batch_fields: setattr(order, field, getattr(batch, field)) row_fields = [ 'product', 'product_upc', 'product_brand', 'product_description', 'product_size', 'product_weighed', 'department_number', 'department_name', 'case_quantity', 'order_quantity', 'order_uom', 'product_unit_cost', 'unit_price', 'discount_percent', 'total_price', 'paid_amount', 'payment_transaction_number', ] def convert(row, i): item = model.CustomerOrderItem() item.sequence = i item.status_code = self.enum.CUSTORDER_ITEM_STATUS_INITIATED for field in row_fields: setattr(item, field, getattr(row, field)) order.items.append(item) # attach event item.events.append(model.CustomerOrderItemEvent( type_code=self.enum.CUSTORDER_ITEM_EVENT_INITIATED, user=user)) self.progress_loop(convert, batch.active_rows(), progress, message="Converting batch rows to order items") session = orm.object_session(batch) session.add(order) session.flush() return order