Files
@ bd964b926682
Branch filter:
Location: rattail-project/rattail/rattail/batch/inventory.py
bd964b926682
15.2 KiB
text/x-python
Add support for syncing roles, with users and permissions for each
but only those roles marked for sync. also by default the GlobalRole
is *not* included in the handler's default list, so this still
requires a bit of setup
but only those roles marked for sync. also by default the GlobalRole
is *not* included in the handler's default list, so this still
requires a bit of setup
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 | # -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Handler for inventory batches
"""
from __future__ import unicode_literals, absolute_import
import decimal
import logging
import six
import sqlalchemy as sa
from sqlalchemy import orm
from rattail.db import api, model
from rattail.gpc import GPC
from rattail.batch import BatchHandler
log = logging.getLogger(__name__)
class InventoryBatchHandler(BatchHandler):
"""
Handler for inventory batches.
"""
batch_model_class = model.InventoryBatch
# set to False to disable "variance" batch count mode
allow_variance = True
# set to False to disable "zero all" batch count mode
allow_zero_all = True
# when user scans unknown product, warn but do not create row
unknown_product_creates_row = False
# set to False to prevent exposing case fields for user input,
# when the batch count mode is "adjust only"
allow_adjustment_cases = True
def init_batch(self, batch, **kwargs):
if batch.total_cost is None:
batch.total_cost = 0
def allow_cases(self, batch):
"""
Must return a boolean to indicate whether "case counts" may be entered
for the batch (in addition to "unit counts"), as opposed to unit counts
only.
"""
if batch.mode == self.enum.INVENTORY_MODE_ADJUST:
return self.allow_adjustment_cases
return True
def get_count_modes(self):
"""
Should return a list of "count modes" from which a user might select
when creating a new inventory batch, for instance. Each mode should be
represented as a JSON-serializable dict.
Note that this method should return *all* possible modes; per-user
restrictions etc. are handled elsewhere.
"""
code_names = {}
for name in dir(self.enum):
if name.startswith('INVENTORY_MODE_'):
code_names[getattr(self.enum, name)] = name
modes = []
for key, label in self.enum.INVENTORY_MODE.items():
name = code_names[key]
if name == 'INVENTORY_MODE_ZERO_ALL':
if not self.allow_zero_all:
continue
if name == 'INVENTORY_MODE_VARIANCE':
if not self.allow_variance:
continue
modes.append({
'code': key,
'code_name': name,
'label': label,
})
return modes
def get_allowed_count_modes(self, session, user,
permission_prefix='inventory'):
"""
Should return a list of "count modes" from which the given user can
select when creating a new inventory batch. Each mode should be
represented as a JSON-serializable dict.
Note that this method should return only those modes which are the
given user is *allowed* to create, according to permissions.
"""
auth = self.app.get_auth_handler()
all_modes = self.get_count_modes()
modes = []
def has_perm(name):
name = '{}.{}'.format(permission_prefix, name)
return auth.has_permission(session, user, name)
for mode in all_modes:
# "replace" modes
if mode['code_name'] in ('INVENTORY_MODE_REPLACE',
'INVENTORY_MODE_REPLACE_ADJUST'):
if not has_perm('create.replace'):
continue
# "zero all" mode
if mode['code_name'] == 'INVENTORY_MODE_ZERO_ALL':
if not has_perm('create.zero'):
continue
# "variance" mode
if mode['code_name'] == 'INVENTORY_MODE_VARIANCE':
if not has_perm('create.variance'):
continue
# user is allowed to create this mode
modes.append(mode)
return modes
def get_adjustment_reasons(self, session):
"""
Returns the full list of Inventory Adjustment Reasons from the database.
"""
reasons = session.query(model.InventoryAdjustmentReason)\
.filter(sa.or_(
model.InventoryAdjustmentReason.hidden == None,
model.InventoryAdjustmentReason.hidden == False))\
.order_by(model.InventoryAdjustmentReason.code)\
.all()
return reasons
def should_populate(self, batch):
if batch.handheld_batches:
return True
if batch.mode == self.enum.INVENTORY_MODE_ZERO_ALL:
return True
def populate(self, batch, progress=None):
"""
Pre-fill batch with row data from an input data file, parsed according
to the batch device type.
"""
if batch.handheld_batches:
self.populate_from_handheld(batch, progress=progress)
elif batch.mode == self.enum.INVENTORY_MODE_ZERO_ALL:
self.populate_zero_all(batch, progress=progress)
def populate_from_handheld(self, batch, progress=None):
def append(hh, i):
row = model.InventoryBatchRow()
row.upc = hh.upc
if hh.cases is not None:
row.cases = hh.cases
if hh.units is not None:
row.units = hh.units
self.add_row(batch, row)
data = []
for handheld in batch.handheld_batches:
data.extend(handheld.active_rows())
self.progress_loop(append, data, progress,
message="Adding initial rows to batch")
def populate_zero_all(self, batch, progress=None):
session = orm.object_session(batch)
products = session.query(model.Product)\
.join(model.ProductInventory)\
.filter(model.ProductInventory.on_hand != None)\
.filter(model.ProductInventory.on_hand != 0)
def append(product, i):
row = model.InventoryBatchRow()
row.product = product
row.units = 0
self.add_row(batch, row)
self.progress_loop(append, products, progress,
message="Adding initial rows to batch")
def should_aggregate_products(self, batch):
"""
Must return a boolean indicating whether rows should be aggregated by
product for the given batch.
If rows *should* be aggregated, what this means is, when a user enters
a barcode, which is already contained within the batch, then that row
should be "added to" as opposed to creating a new row.
"""
if batch.mode == self.enum.INVENTORY_MODE_VARIANCE:
return True
return False
def find_row_for_product(self, session, batch, product):
"""
Locate and return the batch row which matches the given product, if one
exists. Otherwise returns ``None``.
"""
session = orm.object_session(batch)
rows = session.query(model.InventoryBatchRow)\
.filter(model.InventoryBatchRow.batch == batch)\
.filter(model.InventoryBatchRow.product == product)\
.filter(model.InventoryBatchRow.removed == False)\
.all()
if rows:
if len(rows) > 1:
log.warning("inventory batch %s has %s rows for product %s: %s",
batch.id_str, len(rows), product.uuid, product)
return rows[0]
def get_type2_product_info(self, session, entry):
"""
Try to locate the product for the given "Type 2" UPC. This method
should do basic validation on the UPC and will only attempt the product
lookup if it does appear to be a proper Type 2 UPC.
Note that this logic is *not* implemented by default; you must override
with your own logic if you need to support this use case.
:param entry: Should be a "raw" UPC as text, i.e. as entered by the user.
:returns: If product is found, this method should return a 2-tuple
containing the product, and the (decimal) "price" embedded in the
UPC. But if no product could be found, this returns ``None``.
"""
def quick_entry(self, session, batch, entry):
"""
Quick entry is assumed to be a UPC scan or similar user input. Product
lookup will be based on this.
Behavior of this method may vary according to config, but in general
the idea is, either locate an existing batch row, or add a new one,
based on the user input. The row will be returned, if found/created.
"""
if len(entry) > 14:
raise ValueError("UPC has too many ({}) digits: {}".format(
len(entry), entry))
# first we try to locate the product, either by interpreting a "type 2"
# barcode, or else assuming a normal UPC / ID
type2 = self.get_type2_product_info(session, entry)
if type2:
product, embedded_price = type2
else:
product = self.locate_product_for_entry(session, entry,
lookup_by_code=True)
if product:
if product.is_pack_item():
# TODO: should rename this setting
force_unit_item = self.config.getbool(
'tailbone', 'inventory.force_unit_item', default=False)
if force_unit_item:
product = product.unit
# set a flag so caller can inspect/know what happened here
product.__forced_unit_item__ = True
aggregate = self.should_aggregate_products(batch)
if aggregate:
row = self.find_row_for_product(session, batch, product)
if row:
# set flag so caller can inspect/know what happened here
row.__existing_reused__ = True
return row
row = self.make_row()
row.item_entry = entry
row.product = product
self.capture_current_units(row)
if type2 and not aggregate: # TODO: what if aggregating?
if embedded_price is None:
row.units = 1
else:
regprice = product.regular_price.price if product.regular_price else None
if regprice:
row.units = (embedded_price / regprice).quantize(
decimal.Decimal('0.01'))
else:
row.units = 1
self.add_row(batch, row)
return row
if self.unknown_product_creates_row:
row = self.make_row()
row.item_entry = entry
if entry.isdigit():
# TODO: why not calc check digit?
row.upc = GPC(entry, calc_check_digit=False)
row.description = "(unknown product)"
self.add_row(batch, row)
return row
# if we made it this far, then we did not find the product,
# *and* we did not add the unknown product to batch. so just
# throw an error so caller logic can handle that
raise ValueError("Product not found!")
def refresh(self, batch, progress=None):
batch.total_cost = 0
# destroy and re-create data rows if batch is zero-all
if batch.mode == self.enum.INVENTORY_MODE_ZERO_ALL:
del batch.data_rows[:]
batch.rowcount = 0
self.populate_zero_all(batch, progress=progress)
return True
return super(InventoryBatchHandler, self).refresh(batch, progress=progress)
def refresh_row(self, row):
"""
Inspect a row from the source data and populate additional attributes
for it, according to what we find in the database.
"""
product = row.product
if not product:
if row.upc:
session = orm.object_session(row)
product = api.get_product_by_upc(session, row.upc)
if not product:
row.status_code = row.STATUS_PRODUCT_NOT_FOUND
return
# current / static attributes
row.product = product
row.upc = product.upc
row.item_id = product.item_id
row.brand_name = six.text_type(product.brand or '')
row.description = product.description
row.size = product.size
row.status_code = row.STATUS_OK
row.case_quantity = (product.cost.case_size or 1) if product.cost else 1
if row.previous_units_on_hand is not None:
row.variance = self.total_units(row) - row.previous_units_on_hand
# TODO: is this a sufficient check? need to avoid overwriting a cost
# value which has been manually set, but this also means the first
# value that lands will stick, and e.g. new cost would be ignored
if row.unit_cost is None:
row.unit_cost = self.get_unit_cost(row)
self.refresh_totals(row)
def total_units(self, row):
return (row.cases or 0) * (row.case_quantity or 1) + (row.units or 0)
def capture_current_units(self, row):
"""
Capture the "current" (aka. "previous") unit count for the row, if
applicable.
"""
product = row.product
if product and product.inventory:
row.previous_units_on_hand = product.inventory.on_hand
def refresh_totals(self, row):
batch = row.batch
if row.unit_cost is not None:
row.total_cost = row.unit_cost * (row.full_unit_quantity or 0)
batch.total_cost = (batch.total_cost or 0) + row.total_cost
else:
row.total_cost = None
def get_unit_cost(self, row):
if row.product and row.product.cost:
return row.product.cost.unit_cost
def execute(self, batch, progress=None, **kwargs):
rows = batch.active_rows()
self.update_rattail_inventory(batch, rows, progress=progress)
return True
def update_rattail_inventory(self, batch, rows, progress=None):
def update(row, i):
product = row.product
inventory = product.inventory
if not inventory:
inventory = product.inventory = model.ProductInventory()
inventory.on_hand = row.units
self.progress_loop(update, rows, progress,
message="Updating local inventory")
|