Files
@ 87f11679b91f
Branch filter:
Location: rattail-project/rattail/rattail/luigi.py
87f11679b91f
3.6 KiB
text/x-python
Initial narrative docs for Configuration
not really complete, but gosh let's get something in place
not really complete, but gosh let's get something in place
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | # -*- coding: utf-8; -*-
################################################################################
#
# Rattail -- Retail Software Framework
# Copyright © 2010-2020 Lance Edgar
#
# This file is part of Rattail.
#
# Rattail is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rattail is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# Rattail. If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
"""
Luigi utilities
"""
from __future__ import unicode_literals, absolute_import
import os
import datetime
import subprocess
import sys
import logging
import luigi
import six
class OvernightTask(luigi.Task):
"""
Base class for overnight automation tasks.
"""
date = luigi.DateParameter()
# TODO: subclass must define this
filename = None
# how long should we wait after task completes, for datasync to catch up?
datasync_wait_minutes = None
def output(self):
return luigi.LocalTarget('{}/{}'.format(self.date.strftime('%Y/%m/%d'), self.filename))
def run_command(self):
raise NotImplementedError
def touch_output(self):
with self.output().open('w') as f:
pass
def datasync_wait(self, minutes=None, initial_delay=60):
"""
:param int initial_delay: Number of seconds for the initial delay,
before we actually start to wait on the queue to clear out.
"""
if minutes is None:
minutes = self.datasync_wait_minutes or 10
# sometimes the datasync queue can take a moment to actually "fill up"
# initially, so we wait a full minute to ensure that happens before we
# start actually waiting for it to empty out again
if initial_delay:
subprocess.check_call(['sleep', six.text_type(initial_delay)])
subprocess.check_call([
'bin/rattail',
'--config=app/cron.conf',
'datasync',
'--timeout={}'.format(minutes),
'wait',
])
def run(self):
workdir = os.getcwd()
os.chdir(sys.prefix)
self.date_plus = self.date + datetime.timedelta(days=1)
self.run_command()
if self.datasync_wait_minutes:
self.datasync_wait()
os.chdir(workdir)
self.touch_output()
class WarnSummaryIfProblems(logging.Filter):
"""
Custom logging filter, to elevate the Luigi "execution summary" message to
WARNING level, if any problems are detected for the run. Note that this
simply checks for the ``:)`` message to know if there were problem.
"""
good_messages = [
"This progress looks :) because there were no failed tasks or missing external dependencies",
"This progress looks :) because there were no failed tasks or missing dependencies",
]
def filter(self, record):
if record.name == 'luigi-interface' and record.levelno == logging.INFO:
if "===== Luigi Execution Summary =====" in record.msg:
if not any([msg in record.msg for msg in self.good_messages]):
record.levelno = logging.WARNING
record.levelname = 'WARNING'
return True
|