ModelQueue API Reference¶
ModelQueue is an Apache2 licensed task queue based on Django models.
The examples below assume the following in appname/models.py:
import modelqueue
from django.db import models
class Task(models.Model):
data = models.TextField()
status = modelqueue.StatusField(
db_index=True,
# ^-- Index for faster queries.
default=modelqueue.Status.waiting,
# ^-- Waiting state is ready to run.
)
Functions¶
- modelqueue.run(queryset, field, action, retry=3, timeout=datetime.timedelta(seconds=3600), delay=datetime.timedelta(0))¶
Run action on results from queryset in queue defined by field.
For example in appname/management/commands/process_tasks.py:
import modelqueue, time from django.core.management.base import BaseCommand from .models import Task class Command(BaseCommand): def handle(self, *args, **options): while True: task = modelqueue.run( Task.objects.all(), # ^-- Queryset of models to process. 'status', # ^-- Field name for model queue. self.process, # ^-- Callable to process model. ) if task is None: time.sleep(1) # ^-- Bring your own parallelism/concurrency. def process(self, report): pass # Process task models.
- Parameters
queryset – Django queryset
field (str) – field name
action (function) – applied to models from queryset
retry (int) – max retry count (limit 8)
timeout (timedelta) – max runtime for action
delay (timedelta) – delay time after retry
- Returns
model from queryset or None if no waiting model
- modelqueue.admin_list_filter(field)¶
Return Django admin list filter for field describing model queue.
For example in appname/admin.py:
class TaskAdmin(admin.Modeldmin): list_filter = [ modelqueue.admin_list_filter('status'), # ^-- Filter tasks in admin by queue state. ]
- Parameters
field (str) – field name
- Returns
Django admin model queue list filter
- modelqueue.now()¶
Return now datetime in UTC timezone.
Exceptions¶
- class modelqueue.Retry(delay=None)¶
Retry processing the task.
When raised in an action callable from
modelqueue.run()
, the exception is used to signal that the task should be retried.Retry does not increment the attempt count of the task.
Optional delay parameter can be used to override the delay given in
modelqueue.run()
.
- class modelqueue.Abort(delay=None)¶
Abort processing the task.
When raised in an action callable from
modelqueue.run()
, the exception is used to signal that the task was aborted.Abort does increment the attempt count of the task. If the attempts limit is reached then the task will be canceled.
Optional delay parameter can be used to override the delay given in
modelqueue.run()
.
- class modelqueue.Cancel¶
Cancel processing the task.
When raised in an action callable from
modelqueue.run()
, the exception is used to signal that the task should be canceled.Cancel both increments the attempt count and changes the task state to canceled.
State¶
- class modelqueue.State(value, name)¶
Model Queue State
>>> state = State(1, 'created') >>> assert state == 1 >>> assert state.name == 'created' >>> print(state) created >>> repr(state) "State(1, 'created')" >>> State.created State(1, 'created')
- created = State(1, 'created')¶
- waiting = State(2, 'waiting')¶
- working = State(3, 'working')¶
- finished = State(4, 'finished')¶
- canceled = State(5, 'canceled')¶
Status¶
- class modelqueue.Status¶
Model Queue Status
64-bit Signed Integer Field Format:
state priority attempt | |----------------------| | 2 2018 03 27 14 43 25 759 0 | | | | | | | | | | hour | | | year | | minute | | month | second | day millisecond
>>> status = Status(3201801020304567896) >>> assert status.state == State.working >>> assert status.priority == 20180102030456789 >>> assert status.attempts == 6 >>> Status.canceled(123, 5) Status(5000000000000001235)
- states = [State(1, 'created'), State(2, 'waiting'), State(3, 'working'), State(4, 'finished'), State(5, 'canceled')]¶
- property attempts¶
Return attempts of status.
>>> status = Status(3201801020304567896) >>> status.attempts 6
- classmethod canceled(priority=None, attempts=0)¶
Return new canceled status with given priority and attempts.
When priority is None (the default), uses now().
- Parameters
priority – integer or datetime, lower is higher priority
attempts (int) – number of previous attempts (0 through 9)
- Returns
status
- classmethod combine(state, priority, attempts)¶
Combine state, priority, and attempts fields into status.
>>> Status.combine(State.waiting, 0, 1) Status(2000000000000000001) >>> Status.combine('waiting', 0, 2) Status(2000000000000000002) >>> priority = dt.datetime(2018, 1, 2, 3, 4, 56, 789123) >>> Status.combine(State.waiting, priority, 3) Status(2201801020304567893)
- classmethod created(priority=None, attempts=0)¶
Return new created status with given priority and attempts.
When priority is None (the default), uses now().
- Parameters
priority – integer or datetime, lower is higher priority
attempts (int) – number of previous attempts (0 through 9)
- Returns
status
- classmethod filter(field, state)¶
Return keyword arguments to filter field by state for querysets.
For example:
Task.objects.filter(**Status.filter('fieldname', State.working))
- Parameters
field (str) – field name
state (State) – model queue state
- Returns
keyword arguments
>>> kwargs = Status.filter('status', State.waiting) >>> assert len(kwargs) == 2 >>> assert kwargs['status__gte'] == Status.minimum(State.waiting) >>> assert kwargs['status__lte'] == Status.maximum(State.waiting) >>> value = Status.filter('status', 'waiting') >>> assert kwargs == value
- classmethod finished(priority=None, attempts=0)¶
Return new finished status with given priority and attempts.
When priority is None (the default), uses now().
- Parameters
priority – integer or datetime, lower is higher priority
attempts (int) – number of previous attempts (0 through 9)
- Returns
status
- classmethod maximum(state)¶
Calculate status maximum value given state.
>>> Status.maximum(State.working) 3999999999999999999 >>> Status.maximum('working') 3999999999999999999
- Parameters
state – status state
- Returns
maximum status value with given state
- classmethod minimum(state)¶
Calculate status minimum value given state.
>>> Status.minimum(State.working) 3000000000000000000 >>> Status.minimum('working') 3000000000000000000
- Parameters
state – status state
- Returns
minimum status value with given state
- parse(datetime=True)¶
Parse status into state, priority, and attempts fields.
If datetime is True (the default), then parse priority into datetime object. Priority has format:
priority |----------------------| 2018 03 27 14 43 25 759 | | | | | | | | | | hour | | | year | | minute | | month | second | day millisecond
- Parameters
datetime (bool) – parse priority as datetime (default True)
- Returns
tuple of state, priority, and attempts
>>> status = Status(1201801020304567895) >>> state, priority, attempts = status.parse() >>> state State(1, 'created') >>> priority datetime.datetime(2018, 1, 2, 3, 4, 56, 789000, tzinfo=<UTC>) >>> attempts 5 >>> status = Status(3000000000000000007) >>> status.parse(datetime=False) (State(3, 'working'), 0, 7)
- property priority¶
Return priority of status.
>>> status = Status(3201801020304567896) >>> status.priority 20180102030456789
- property state¶
Return state of status.
>>> status = Status(3201801020304567896) >>> status.state State(3, 'working')
- classmethod tally(queryset, field)¶
Return mapping of state to count from field in queryset.
For example:
Status.tally(Task.objects.all(), 'status')
- Parameters
queryset – Django queryset
field (str) – field name
- Returns
mapping of state names to counts
- classmethod waiting(priority=None, attempts=0)¶
Return new waiting status with given priority and attempts.
When priority is None (the default), uses now().
- Parameters
priority – integer or datetime, lower is higher priority
attempts (int) – number of previous attempts (0 through 9)
- Returns
status
- classmethod working(priority=None, attempts=0)¶
Return new working status with given priority and attempts.
When priority is None (the default), uses now().
- Parameters
priority – integer or datetime, lower is higher priority
attempts (int) – number of previous attempts (0 through 9)
- Returns
status
StatusField¶
- modelqueue.StatusField¶
alias of
django.db.models.fields.BigIntegerField
Constants¶
- modelqueue.ONE_HOUR¶
- modelqueue.ZERO_SECS¶