mirror of
https://github.com/jazzband/django-celery-monitor.git
synced 2026-05-25 15:23:53 +00:00
Compare commits
45 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b86acddc4 | ||
|
|
5320d6014c | ||
|
|
9fe14325e8 | ||
|
|
c055ee8d2c | ||
|
|
aec77ca926 | ||
|
|
6ec13500d0 | ||
|
|
ca942e2408 | ||
|
|
1f890c977d | ||
|
|
a91a8eed57 | ||
|
|
7012454230 | ||
|
|
9591ff917a | ||
|
|
08fc27346b | ||
|
|
2b761082b7 | ||
|
|
6d1276692f | ||
|
|
c21f7274a6 | ||
|
|
25266115f6 | ||
|
|
62ba936a79 | ||
|
|
358fbc6e46 | ||
|
|
4797e77adf | ||
|
|
cfcc54bc82 | ||
|
|
b11aa0e380 | ||
|
|
961142d944 | ||
|
|
eaf07691a2 | ||
|
|
72d895d93b | ||
|
|
4ae8a39a32 | ||
|
|
04131fecde | ||
|
|
93fbe8421c | ||
|
|
e36dc66467 | ||
|
|
4526b8c8b2 | ||
|
|
8b94598727 | ||
|
|
0c7350ef1e | ||
|
|
87be921761 | ||
|
|
a9c514fabd | ||
|
|
539579a868 | ||
|
|
4a628dd395 | ||
|
|
60d7200f49 | ||
|
|
830c44f29e | ||
|
|
0182845614 | ||
|
|
9d41cfb97a | ||
|
|
09c64f73be | ||
|
|
68c9bb847e | ||
|
|
940c7115cb | ||
|
|
ce9ac69091 | ||
|
|
92f98c7d25 | ||
|
|
baae2f3466 |
35 changed files with 485 additions and 220 deletions
|
|
@ -1,5 +1,5 @@
|
|||
[bumpversion]
|
||||
current_version = 1.0.1
|
||||
current_version = 1.1.2
|
||||
commit = True
|
||||
tag = True
|
||||
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?P<releaselevel>[a-z]+)?
|
||||
|
|
|
|||
53
.github/workflows/release.yml
vendored
Normal file
53
.github/workflows/release.yml
vendored
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
name: Release
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
jobs:
|
||||
build:
|
||||
if: github.repository == 'jazzband/django-celery-monitor'
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.8
|
||||
|
||||
- name: Get pip cache dir
|
||||
id: pip-cache
|
||||
run: |
|
||||
echo "::set-output name=dir::$(pip cache dir)"
|
||||
|
||||
- name: Cache
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.pip-cache.outputs.dir }}
|
||||
key: release-${{ hashFiles('**/setup.py') }}
|
||||
restore-keys: |
|
||||
release-
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install -U pip
|
||||
python -m pip install -U setuptools twine wheel
|
||||
|
||||
- name: Build package
|
||||
run: |
|
||||
python setup.py --version
|
||||
python setup.py sdist --format=gztar bdist_wheel
|
||||
twine check dist/*
|
||||
|
||||
- name: Upload packages to Jazzband
|
||||
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
|
||||
uses: pypa/gh-action-pypi-publish@master
|
||||
with:
|
||||
user: jazzband
|
||||
password: ${{ secrets.JAZZBAND_RELEASE_KEY }}
|
||||
repository_url: https://jazzband.co/projects/django-celery-monitor/upload
|
||||
48
.github/workflows/test.yml
vendored
Normal file
48
.github/workflows/test.yml
vendored
Normal file
|
|
@ -0,0 +1,48 @@
|
|||
name: Test
|
||||
|
||||
on: [push, pull_request]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
fail-fast: false
|
||||
max-parallel: 5
|
||||
matrix:
|
||||
python-version: ['2.7', '3.5', '3.6']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Set up Python ${{ matrix.python-version }}
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: ${{ matrix.python-version }}
|
||||
|
||||
- name: Get pip cache dir
|
||||
id: pip-cache
|
||||
run: |
|
||||
echo "::set-output name=dir::$(pip cache dir)"
|
||||
|
||||
- name: Cache
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: ${{ steps.pip-cache.outputs.dir }}
|
||||
key:
|
||||
${{ matrix.python-version }}-v1-${{ hashFiles('**/setup.py') }}
|
||||
restore-keys: |
|
||||
${{ matrix.python-version }}-v1-
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install --upgrade tox tox-gh-actions
|
||||
|
||||
- name: Tox tests
|
||||
run: |
|
||||
tox -v
|
||||
|
||||
- name: Upload coverage
|
||||
uses: codecov/codecov-action@v1
|
||||
with:
|
||||
name: Python ${{ matrix.python-version }}
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -28,3 +28,4 @@ cover/
|
|||
.cache/
|
||||
htmlcov/
|
||||
coverage.xml
|
||||
.tox/
|
||||
|
|
|
|||
21
.travis.yml
21
.travis.yml
|
|
@ -1,21 +0,0 @@
|
|||
language: python
|
||||
sudo: false
|
||||
python:
|
||||
- '2.7'
|
||||
- '3.4'
|
||||
- '3.5'
|
||||
- '3.6'
|
||||
install: travis_retry pip install -U codecov tox-travis
|
||||
script: tox -v --travis-after -- -v
|
||||
after_success:
|
||||
- codecov
|
||||
deploy:
|
||||
provider: pypi
|
||||
user: jezdez
|
||||
password:
|
||||
secure: IyvYrWsNRfmKH5Ib8B6M/F/+EFadWG1z3qs0l5nFAYAx5LqZuL0esLPMLOxgigfN5Ii8Ne1FnJ3KDV8B/i8V6ApBJgiE+0yeP/ExQZ/WQ2ug/CcvxgIFhRN79WzwpWTqRvuYRlxGnA1cp+VLFAweC4dqzXLIzMFQw9uN8h/1uOszc9VsW4asq7/pMFutB0hYMjjZV2RrEARuCVsw93hROB/beGTAmfUOURoVv7zZ1HkcU6fUHDOU10ip7kNxNPVfXk7Z/HOyMti9Hgg6ZGTd7afCN1XBCPj+9nHbOghiEQ8RlaqN9EnoaUfC6H540QuIZISO8aEHxQqzpx8yWNScf/8ZZq3LsBCL6EtzuRWg9HdbZ7DkdezTUHXt8Rnk5ehF8H/bv43ZawTxXxBLTOsH9+ffj98BtpWWAdwEarRLTe61unw62AL7FmCzrfK1amsP74TdlZqTpJTq3Lgb3n31BxC4y4ONhjvlSKxX/z5PO/S9fb94R1v1kAMGeJcB6xtoxNjvfOov3KuRrWdDW+DL8k5tatJSBVyMJMHaOoLFxgmxZG71QZjX6v9UUKXjXD2nwTq9Z8A1aAjQibPMQXRqNErg9uax5h9B1ugWsjp+zeiYYlsQyCCib4h17beVgxVuEiwaDhN0nuaRJKVWWWoDNgwsNF8v5uJTyZNDCk7wFK0=
|
||||
on:
|
||||
tags: true
|
||||
repo: jezdez/django-celery-monitor
|
||||
python: 3.5
|
||||
distributions: sdist bdist_wheel
|
||||
|
|
@ -4,6 +4,43 @@
|
|||
Change history
|
||||
================
|
||||
|
||||
.. _version-1.1.2:
|
||||
|
||||
:release-date: 2017-05-18 11:30 a.m. UTC+2
|
||||
:release-by: Jannis Leidel
|
||||
|
||||
- More packaging fixes. Sigh.
|
||||
|
||||
.. _version-1.1.1:
|
||||
|
||||
:release-date: 2017-05-18 10:30 a.m. UTC+2
|
||||
:release-by: Jannis Leidel
|
||||
|
||||
- Fix the folder that the extra stylesheet file was stored in.
|
||||
|
||||
.. _version-1.1.0:
|
||||
|
||||
:release-date: 2017-05-11 10:25 p.m. UTC+2
|
||||
:release-by: Jannis Leidel
|
||||
|
||||
- Use ``SELECT FOR UPDATE`` SQL statements for updating the task and worker
|
||||
states to improve resilliance against race conditions by multiple
|
||||
simultaneously running cameras.
|
||||
|
||||
- Move worker state cache from in-process dictionary into database side
|
||||
timestamp to decide whether to do another worker update or not.
|
||||
|
||||
- Improved code structure by moving all utilities into same module.
|
||||
|
||||
.. _version-1.0.2:
|
||||
|
||||
:release-date: 2017-05-08 16:05 a.m. UTC+2
|
||||
:release-by: Jannis Leidel
|
||||
|
||||
- Import Django models inline to prevent import time side effect.
|
||||
|
||||
- Run django.setup() when installing the Camera.
|
||||
|
||||
.. _version-1.0.1:
|
||||
|
||||
:release-date: 2017-05-03 10:17 a.m. UTC+2
|
||||
|
|
|
|||
46
CODE_OF_CONDUCT.md
Normal file
46
CODE_OF_CONDUCT.md
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
# Code of Conduct
|
||||
|
||||
As contributors and maintainers of the Jazzband projects, and in the interest of
|
||||
fostering an open and welcoming community, we pledge to respect all people who
|
||||
contribute through reporting issues, posting feature requests, updating documentation,
|
||||
submitting pull requests or patches, and other activities.
|
||||
|
||||
We are committed to making participation in the Jazzband a harassment-free experience
|
||||
for everyone, regardless of the level of experience, gender, gender identity and
|
||||
expression, sexual orientation, disability, personal appearance, body size, race,
|
||||
ethnicity, age, religion, or nationality.
|
||||
|
||||
Examples of unacceptable behavior by participants include:
|
||||
|
||||
- The use of sexualized language or imagery
|
||||
- Personal attacks
|
||||
- Trolling or insulting/derogatory comments
|
||||
- Public or private harassment
|
||||
- Publishing other's private information, such as physical or electronic addresses,
|
||||
without explicit permission
|
||||
- Other unethical or unprofessional conduct
|
||||
|
||||
The Jazzband roadies have the right and responsibility to remove, edit, or reject
|
||||
comments, commits, code, wiki edits, issues, and other contributions that are not
|
||||
aligned to this Code of Conduct, or to ban temporarily or permanently any contributor
|
||||
for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
|
||||
|
||||
By adopting this Code of Conduct, the roadies commit themselves to fairly and
|
||||
consistently applying these principles to every aspect of managing the jazzband
|
||||
projects. Roadies who do not follow or enforce the Code of Conduct may be permanently
|
||||
removed from the Jazzband roadies.
|
||||
|
||||
This code of conduct applies both within project spaces and in public spaces when an
|
||||
individual is representing the project or its community.
|
||||
|
||||
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by
|
||||
contacting the roadies at `roadies@jazzband.co`. All complaints will be reviewed and
|
||||
investigated and will result in a response that is deemed necessary and appropriate to
|
||||
the circumstances. Roadies are obligated to maintain confidentiality with regard to the
|
||||
reporter of an incident.
|
||||
|
||||
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version
|
||||
1.3.0, available at [https://contributor-covenant.org/version/1/3/0/][version]
|
||||
|
||||
[homepage]: https://contributor-covenant.org
|
||||
[version]: https://contributor-covenant.org/version/1/3/0/
|
||||
5
CONTRIBUTING.rst
Normal file
5
CONTRIBUTING.rst
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
.. image:: https://jazzband.co/static/img/jazzband.svg
|
||||
:target: https://jazzband.co/
|
||||
:alt: Jazzband
|
||||
|
||||
This is a `Jazzband <https://jazzband.co>`_ project. By contributing you agree to abide by the `Contributor Code of Conduct <https://jazzband.co/about/conduct>`_ and follow the `guidelines <https://jazzband.co/about/guidelines>`_.
|
||||
30
README.rst
30
README.rst
|
|
@ -2,13 +2,13 @@
|
|||
Celery Monitoring for Django
|
||||
============================
|
||||
|
||||
:Version: 1.0.1
|
||||
:Version: 1.1.2
|
||||
:Web: https://django-celery-monitor.readthedocs.io/
|
||||
:Download: https://pypi.python.org/pypi/django_celery_monitor
|
||||
:Source: https://github.com/jezdez/django-celery-monitor
|
||||
:Download: https://pypi.org/project/django_celery_monitor/
|
||||
:Source: https://github.com/jazzband/django-celery-monitor
|
||||
:Keywords: django, celery, events, monitoring
|
||||
|
||||
|build-status| |coverage| |license| |wheel| |pyversion| |pyimp|
|
||||
|jazzband| |build-status| |coverage| |license| |wheel| |pyversion| |pyimp|
|
||||
|
||||
About
|
||||
=====
|
||||
|
|
@ -38,14 +38,14 @@ Other parts of django-celery were released as
|
|||
Installation
|
||||
============
|
||||
|
||||
You can install django-celery-monitor either via the Python Package Index (PyPI)
|
||||
You can install django_celery_monitor either via the Python Package Index (PyPI)
|
||||
or from source.
|
||||
|
||||
To install using `pip`,:
|
||||
|
||||
.. code-block:: console
|
||||
|
||||
$ pip install -U django-celery-monitor
|
||||
$ pip install -U django_celery_monitor
|
||||
|
||||
Usage
|
||||
=====
|
||||
|
|
@ -72,7 +72,10 @@ To use this with your project you need to follow these steps:
|
|||
|
||||
.. code-block:: console
|
||||
|
||||
$ python manage.py migrate django_celery_monitor
|
||||
$ python manage.py migrate celery_monitor
|
||||
|
||||
#. Go to the Django admin of your site and look for the "Celery Monitor"
|
||||
section.
|
||||
|
||||
Starting the monitoring process
|
||||
===============================
|
||||
|
|
@ -121,13 +124,16 @@ In your Celery configuration simply set them to override the defaults, e.g.::
|
|||
|
||||
monitor_task_success_expires = timedelta(days=7)
|
||||
|
||||
.. |jazzband| image:: https://jazzband.co/static/img/badge.svg
|
||||
:target: https://jazzband.co/
|
||||
:alt: Jazzband
|
||||
|
||||
.. |build-status| image:: https://secure.travis-ci.org/jezdez/django-celery-monitor.svg?branch=master
|
||||
:alt: Build status
|
||||
:target: https://travis-ci.org/jezdez/django-celery-monitor
|
||||
.. |build-status| image:: https://github.com/jazzband/django-celery-monitor/workflows/Test/badge.svg
|
||||
:target: https://github.com/jazzband/django-celery-monitor/actions
|
||||
:alt: GitHub Actions
|
||||
|
||||
.. |coverage| image:: https://codecov.io/github/jezdez/django-celery-monitor/coverage.svg?branch=master
|
||||
:target: https://codecov.io/github/jezdez/django-celery-monitor?branch=master
|
||||
.. |coverage| image:: https://codecov.io/github/jazzband/django-celery-monitor/coverage.svg?branch=master
|
||||
:target: https://codecov.io/github/jazzband/django-celery-monitor?branch=master
|
||||
|
||||
.. |license| image:: https://img.shields.io/pypi/l/django-celery-monitor.svg
|
||||
:alt: BSD License
|
||||
|
|
|
|||
|
|
@ -10,10 +10,10 @@ import re
|
|||
|
||||
from collections import namedtuple
|
||||
|
||||
__version__ = '1.0.1'
|
||||
__version__ = '1.1.2'
|
||||
__author__ = 'Jannis Leidel'
|
||||
__contact__ = 'jannis@leidel.info'
|
||||
__homepage__ = 'https://github.com/jezdez/django-celery-monitor'
|
||||
__homepage__ = 'https://github.com/jazzband/django-celery-monitor'
|
||||
__docformat__ = 'restructuredtext'
|
||||
|
||||
# -eof meta-
|
||||
|
|
|
|||
|
|
@ -17,10 +17,9 @@ from celery import states
|
|||
from celery.task.control import broadcast, revoke, rate_limit
|
||||
from celery.utils.text import abbrtask
|
||||
|
||||
from .admin_utils import action, display_field, fixedwidth
|
||||
from .models import TaskState, WorkerState
|
||||
from .humanize import naturaldate
|
||||
from .utils import make_aware
|
||||
from .utils import action, display_field, fixedwidth, make_aware
|
||||
|
||||
|
||||
TASK_STATE_COLORS = {states.SUCCESS: 'green',
|
||||
|
|
|
|||
|
|
@ -1,53 +0,0 @@
|
|||
"""Some helpers for the admin monitors."""
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
from pprint import pformat
|
||||
|
||||
from django.utils.html import escape
|
||||
|
||||
FIXEDWIDTH_STYLE = '''\
|
||||
<span title="{0}" style="font-size: {1}pt; \
|
||||
font-family: Menlo, Courier; ">{2}</span> \
|
||||
'''
|
||||
|
||||
|
||||
def _attrs(**kwargs):
|
||||
def _inner(fun):
|
||||
for attr_name, attr_value in kwargs.items():
|
||||
setattr(fun, attr_name, attr_value)
|
||||
return fun
|
||||
return _inner
|
||||
|
||||
|
||||
def display_field(short_description, admin_order_field,
|
||||
allow_tags=True, **kwargs):
|
||||
"""Set some display_field attributes."""
|
||||
return _attrs(short_description=short_description,
|
||||
admin_order_field=admin_order_field,
|
||||
allow_tags=allow_tags, **kwargs)
|
||||
|
||||
|
||||
def action(short_description, **kwargs):
|
||||
"""Set some admin action attributes."""
|
||||
return _attrs(short_description=short_description, **kwargs)
|
||||
|
||||
|
||||
def fixedwidth(field, name=None, pt=6, width=16, maxlen=64, pretty=False):
|
||||
"""Render a field with a fixed width."""
|
||||
@display_field(name or field, field)
|
||||
def f(task):
|
||||
val = getattr(task, field)
|
||||
if pretty:
|
||||
val = pformat(val, width=width)
|
||||
if val.startswith("u'") or val.startswith('u"'):
|
||||
val = val[2:-1]
|
||||
shortval = val.replace(',', ',\n')
|
||||
shortval = shortval.replace('\n', '|br/|')
|
||||
|
||||
if len(shortval) > maxlen:
|
||||
shortval = shortval[:maxlen] + '...'
|
||||
styled = FIXEDWIDTH_STYLE.format(
|
||||
escape(val[:255]), pt, escape(shortval),
|
||||
)
|
||||
return styled.replace('|br/|', '<br/>')
|
||||
return f
|
||||
|
|
@ -1,17 +1,14 @@
|
|||
"""The Celery events camera."""
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
from collections import defaultdict
|
||||
from datetime import timedelta
|
||||
|
||||
from celery import states
|
||||
from celery.events.state import Task
|
||||
from celery.events.snapshot import Polaroid
|
||||
from celery.five import monotonic
|
||||
from celery.utils.imports import symbol_by_name
|
||||
from celery.utils.log import get_logger
|
||||
from celery.utils.time import maybe_iso8601
|
||||
|
||||
from .models import WorkerState, TaskState
|
||||
from .utils import fromtimestamp, correct_awareness
|
||||
|
||||
WORKER_UPDATE_FREQ = 60 # limit worker timestamp write freq.
|
||||
|
|
@ -24,22 +21,13 @@ debug = logger.debug
|
|||
|
||||
|
||||
class Camera(Polaroid):
|
||||
"""The Celery events Polaroid snapshot camera.
|
||||
|
||||
Stores task and worker state in the data models
|
||||
``django_celery_monitor.models.TaskState`` and
|
||||
``django_celery_monitor.models.WorkerState``.
|
||||
"""
|
||||
|
||||
TaskState = TaskState
|
||||
WorkerState = WorkerState
|
||||
"""The Celery events Polaroid snapshot camera."""
|
||||
|
||||
clear_after = True
|
||||
worker_update_freq = WORKER_UPDATE_FREQ
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(Camera, self).__init__(*args, **kwargs)
|
||||
self._last_worker_write = defaultdict(lambda: (None, None))
|
||||
# Expiry can be timedelta or None for never expire.
|
||||
self.app.add_defaults({
|
||||
'monitors_expire_success': timedelta(days=1),
|
||||
|
|
@ -47,6 +35,24 @@ class Camera(Polaroid):
|
|||
'monitors_expire_pending': timedelta(days=5),
|
||||
})
|
||||
|
||||
@property
|
||||
def TaskState(self):
|
||||
"""Return the data model to store task state in."""
|
||||
return symbol_by_name('django_celery_monitor.models.TaskState')
|
||||
|
||||
@property
|
||||
def WorkerState(self):
|
||||
"""Return the data model to store worker state in."""
|
||||
return symbol_by_name('django_celery_monitor.models.WorkerState')
|
||||
|
||||
def django_setup(self):
|
||||
import django
|
||||
django.setup()
|
||||
|
||||
def install(self):
|
||||
super(Camera, self).install()
|
||||
self.django_setup()
|
||||
|
||||
@property
|
||||
def expire_task_states(self):
|
||||
"""Return a twople of Celery task states and expiration timedeltas."""
|
||||
|
|
@ -64,16 +70,12 @@ class Camera(Polaroid):
|
|||
return fromtimestamp(heartbeat)
|
||||
|
||||
def handle_worker(self, hostname_worker):
|
||||
(hostname, worker) = hostname_worker
|
||||
last_write, obj = self._last_worker_write[hostname]
|
||||
if (not last_write or
|
||||
monotonic() - last_write > self.worker_update_freq):
|
||||
obj, _ = self.WorkerState.objects.update_or_create(
|
||||
hostname=hostname,
|
||||
defaults={'last_heartbeat': self.get_heartbeat(worker)},
|
||||
)
|
||||
self._last_worker_write[hostname] = (monotonic(), obj)
|
||||
return obj
|
||||
hostname, worker = hostname_worker
|
||||
return self.WorkerState.objects.update_heartbeat(
|
||||
hostname,
|
||||
heartbeat=self.get_heartbeat(worker),
|
||||
update_freq=self.worker_update_freq,
|
||||
)
|
||||
|
||||
def handle_task(self, uuid_task, worker=None):
|
||||
"""Handle snapshotted event."""
|
||||
|
|
@ -103,29 +105,17 @@ class Camera(Polaroid):
|
|||
if defaults[attr] is None]
|
||||
return self.update_task(task.state, task_id=uuid, defaults=defaults)
|
||||
|
||||
def update_task(self, state, **kwargs):
|
||||
objects = self.TaskState.objects
|
||||
defaults = kwargs.pop('defaults', None) or {}
|
||||
def update_task(self, state, task_id, defaults=None):
|
||||
defaults = defaults or {}
|
||||
if not defaults.get('name'):
|
||||
return
|
||||
obj, created = objects.get_or_create(defaults=defaults, **kwargs)
|
||||
if created:
|
||||
return obj
|
||||
else:
|
||||
if states.state(state) < states.state(obj.state):
|
||||
keep = Task.merge_rules[states.RECEIVED]
|
||||
defaults = dict(
|
||||
(k, v) for k, v in defaults.items()
|
||||
if k not in keep
|
||||
)
|
||||
return self.TaskState.objects.update_state(
|
||||
state=state,
|
||||
task_id=task_id,
|
||||
defaults=defaults,
|
||||
)
|
||||
|
||||
for k, v in defaults.items():
|
||||
setattr(obj, k, v)
|
||||
obj.save()
|
||||
|
||||
return obj
|
||||
|
||||
def on_shutter(self, state, commit_every=100):
|
||||
def on_shutter(self, state):
|
||||
|
||||
def _handle_tasks():
|
||||
for i, task in enumerate(state.tasks.items()):
|
||||
|
|
@ -136,8 +126,10 @@ class Camera(Polaroid):
|
|||
_handle_tasks()
|
||||
|
||||
def on_cleanup(self):
|
||||
expired = (self.TaskState.objects.expire_by_states(states, expires)
|
||||
for states, expires in self.expire_task_states)
|
||||
expired = (
|
||||
self.TaskState.objects.expire_by_states(states, expires)
|
||||
for states, expires in self.expire_task_states
|
||||
)
|
||||
dirty = sum(item for item in expired if item is not None)
|
||||
if dirty:
|
||||
debug('Cleanup: Marked %s objects as dirty.', dirty)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from __future__ import absolute_import, unicode_literals
|
|||
from datetime import datetime
|
||||
|
||||
from django.utils.translation import ungettext, ugettext as _
|
||||
from .utils import now
|
||||
from django.utils.timezone import now
|
||||
|
||||
|
||||
def pluralize_year(n):
|
||||
|
|
|
|||
|
|
@ -1,27 +1,82 @@
|
|||
"""The model managers."""
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
from datetime import timedelta
|
||||
|
||||
from celery import states
|
||||
from celery.events.state import Task
|
||||
from celery.utils.time import maybe_timedelta
|
||||
from django.db import connections, models, router, transaction
|
||||
from django.db import models, router, transaction
|
||||
|
||||
from .utils import now
|
||||
from .utils import Now
|
||||
|
||||
|
||||
class TaskStateManager(models.Manager):
|
||||
"""A custom models manager for the TaskState model with some helpers."""
|
||||
class ExtendedQuerySet(models.QuerySet):
|
||||
"""A custom model queryset that implements a few helpful methods."""
|
||||
|
||||
def connection_for_write(self):
|
||||
"""Return the database connection that is configured for writing."""
|
||||
return connections[router.db_for_write(self.model)]
|
||||
def select_for_update_or_create(self, defaults=None, **kwargs):
|
||||
"""Extend update_or_create with select_for_update.
|
||||
|
||||
Look up an object with the given kwargs, updating one with defaults
|
||||
if it exists, otherwise create a new one.
|
||||
Return a tuple (object, created), where created is a boolean
|
||||
specifying whether an object was created.
|
||||
|
||||
This is a backport from Django 1.11
|
||||
(https://code.djangoproject.com/ticket/26804) to support
|
||||
select_for_update when getting the object.
|
||||
"""
|
||||
defaults = defaults or {}
|
||||
lookup, params = self._extract_model_params(defaults, **kwargs)
|
||||
self._for_write = True
|
||||
with transaction.atomic(using=self.db):
|
||||
try:
|
||||
obj = self.select_for_update().get(**lookup)
|
||||
except self.model.DoesNotExist:
|
||||
obj, created = self._create_object_from_params(lookup, params)
|
||||
if created:
|
||||
return obj, created
|
||||
for k, v in defaults.items():
|
||||
setattr(obj, k, v() if callable(v) else v)
|
||||
obj.save(using=self.db)
|
||||
return obj, False
|
||||
|
||||
|
||||
class WorkerStateQuerySet(ExtendedQuerySet):
|
||||
"""A custom model queryset for the WorkerState model with some helpers."""
|
||||
|
||||
def update_heartbeat(self, hostname, heartbeat, update_freq):
|
||||
with transaction.atomic():
|
||||
# check if there was an update in the last n seconds?
|
||||
interval = Now() - timedelta(seconds=update_freq)
|
||||
recent_worker_updates = self.filter(
|
||||
hostname=hostname,
|
||||
last_update__gte=interval,
|
||||
)
|
||||
if recent_worker_updates.exists():
|
||||
# if yes, get the latest update and move on
|
||||
obj = recent_worker_updates.get()
|
||||
else:
|
||||
# if no, update the worker state and move on
|
||||
obj, _ = self.select_for_update_or_create(
|
||||
hostname=hostname,
|
||||
defaults={'last_heartbeat': heartbeat},
|
||||
)
|
||||
return obj
|
||||
|
||||
|
||||
class TaskStateQuerySet(ExtendedQuerySet):
|
||||
"""A custom model queryset for the TaskState model with some helpers."""
|
||||
|
||||
def active(self):
|
||||
"""Return all active task states."""
|
||||
return self.filter(hidden=False)
|
||||
|
||||
def expired(self, states, expires, nowfun=now):
|
||||
def expired(self, states, expires):
|
||||
"""Return all expired task states."""
|
||||
return self.filter(state__in=states,
|
||||
tstamp__lte=nowfun() - maybe_timedelta(expires))
|
||||
return self.filter(
|
||||
state__in=states,
|
||||
tstamp__lte=Now() - maybe_timedelta(expires),
|
||||
)
|
||||
|
||||
def expire_by_states(self, states, expires):
|
||||
"""Expire task with one of the given states."""
|
||||
|
|
@ -30,10 +85,26 @@ class TaskStateManager(models.Manager):
|
|||
|
||||
def purge(self):
|
||||
"""Purge all expired task states."""
|
||||
meta = self.model._meta
|
||||
with transaction.atomic():
|
||||
cursor = self.connection_for_write().cursor()
|
||||
cursor.execute(
|
||||
'DELETE FROM {0.db_table} WHERE hidden=%s'.format(meta),
|
||||
(True, ),
|
||||
self.using(
|
||||
router.db_for_write(self.model)
|
||||
).filter(hidden=True).delete()
|
||||
|
||||
def update_state(self, state, task_id, defaults):
|
||||
with transaction.atomic():
|
||||
obj, created = self.select_for_update_or_create(
|
||||
task_id=task_id,
|
||||
defaults=defaults,
|
||||
)
|
||||
if created:
|
||||
return obj
|
||||
|
||||
if states.state(state) < states.state(obj.state):
|
||||
keep = Task.merge_rules[states.RECEIVED]
|
||||
else:
|
||||
keep = {}
|
||||
for key, value in defaults.items():
|
||||
if key not in keep:
|
||||
setattr(obj, key, value)
|
||||
obj.save(update_fields=tuple(defaults.keys()))
|
||||
return obj
|
||||
|
|
|
|||
|
|
@ -0,0 +1,25 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.utils.timezone
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('celery_monitor', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='workerstate',
|
||||
name='last_update',
|
||||
field=models.DateTimeField(
|
||||
default=django.utils.timezone.now,
|
||||
auto_now=True,
|
||||
verbose_name='last update',
|
||||
),
|
||||
preserve_default=False,
|
||||
),
|
||||
]
|
||||
|
|
@ -26,10 +26,11 @@ class WorkerState(models.Model):
|
|||
#: A :class:`~datetime.datetime` describing when the worker was last seen.
|
||||
last_heartbeat = models.DateTimeField(_('last heartbeat'), null=True,
|
||||
db_index=True)
|
||||
last_update = models.DateTimeField(_('last update'), auto_now=True)
|
||||
|
||||
#: A :class:`~django.db.models.Manager` instance
|
||||
#: A :class:`~django_celery_monitor.managers.ExtendedManager` instance
|
||||
#: to query the :class:`~django_celery_monitor.models.WorkerState` model.
|
||||
objects = models.Manager()
|
||||
objects = managers.WorkerStateQuerySet.as_manager()
|
||||
|
||||
class Meta:
|
||||
"""Model meta-data."""
|
||||
|
|
@ -106,7 +107,7 @@ class TaskState(models.Model):
|
|||
|
||||
#: A :class:`~django_celery_monitor.managers.TaskStateManager` instance
|
||||
#: to query the :class:`~django_celery_monitor.models.TaskState` model.
|
||||
objects = managers.TaskStateManager()
|
||||
objects = managers.TaskStateQuerySet.as_manager()
|
||||
|
||||
class Meta:
|
||||
"""Model meta-data."""
|
||||
|
|
|
|||
|
|
@ -4,20 +4,33 @@
|
|||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
from datetime import datetime
|
||||
from pprint import pformat
|
||||
|
||||
from django.conf import settings
|
||||
from django.db.models import DateTimeField, Func
|
||||
from django.utils import timezone
|
||||
from django.utils.html import escape
|
||||
|
||||
# see Issue celery/django-celery#222
|
||||
now_localtime = getattr(timezone, 'template_localtime', timezone.localtime)
|
||||
try:
|
||||
from django.db.models.functions import Now
|
||||
except ImportError:
|
||||
|
||||
class Now(Func):
|
||||
"""A backport of the Now function from Django 1.9.x."""
|
||||
|
||||
def now():
|
||||
"""Return the current date and time."""
|
||||
if getattr(settings, 'USE_TZ', False):
|
||||
return now_localtime(timezone.now())
|
||||
else:
|
||||
return timezone.now()
|
||||
template = 'CURRENT_TIMESTAMP'
|
||||
|
||||
def __init__(self, output_field=None, **extra):
|
||||
if output_field is None:
|
||||
output_field = DateTimeField()
|
||||
super(Now, self).__init__(output_field=output_field, **extra)
|
||||
|
||||
def as_postgresql(self, compiler, connection):
|
||||
# Postgres' CURRENT_TIMESTAMP means "the time at the start of the
|
||||
# transaction". We use STATEMENT_TIMESTAMP to be cross-compatible
|
||||
# with other databases.
|
||||
self.template = 'STATEMENT_TIMESTAMP()'
|
||||
return self.as_sql(compiler, connection)
|
||||
|
||||
|
||||
def make_aware(value):
|
||||
|
|
@ -49,3 +62,51 @@ def fromtimestamp(value):
|
|||
return make_aware(datetime.utcfromtimestamp(value))
|
||||
else:
|
||||
return datetime.fromtimestamp(value)
|
||||
|
||||
|
||||
FIXEDWIDTH_STYLE = '''\
|
||||
<span title="{0}" style="font-size: {1}pt; \
|
||||
font-family: Menlo, Courier; ">{2}</span> \
|
||||
'''
|
||||
|
||||
|
||||
def _attrs(**kwargs):
|
||||
def _inner(fun):
|
||||
for attr_name, attr_value in kwargs.items():
|
||||
setattr(fun, attr_name, attr_value)
|
||||
return fun
|
||||
return _inner
|
||||
|
||||
|
||||
def display_field(short_description, admin_order_field,
|
||||
allow_tags=True, **kwargs):
|
||||
"""Set some display_field attributes."""
|
||||
return _attrs(short_description=short_description,
|
||||
admin_order_field=admin_order_field,
|
||||
allow_tags=allow_tags, **kwargs)
|
||||
|
||||
|
||||
def action(short_description, **kwargs):
|
||||
"""Set some admin action attributes."""
|
||||
return _attrs(short_description=short_description, **kwargs)
|
||||
|
||||
|
||||
def fixedwidth(field, name=None, pt=6, width=16, maxlen=64, pretty=False):
|
||||
"""Render a field with a fixed width."""
|
||||
@display_field(name or field, field)
|
||||
def f(task):
|
||||
val = getattr(task, field)
|
||||
if pretty:
|
||||
val = pformat(val, width=width)
|
||||
if val.startswith("u'") or val.startswith('u"'):
|
||||
val = val[2:-1]
|
||||
shortval = val.replace(',', ',\n')
|
||||
shortval = shortval.replace('\n', '|br/|')
|
||||
|
||||
if len(shortval) > maxlen:
|
||||
shortval = shortval[:maxlen] + '...'
|
||||
styled = FIXEDWIDTH_STYLE.format(
|
||||
escape(val[:255]), pt, escape(shortval),
|
||||
)
|
||||
return styled.replace('|br/|', '<br/>')
|
||||
return f
|
||||
|
|
|
|||
|
|
@ -8,11 +8,11 @@ from sphinx_celery import conf
|
|||
globals().update(conf.build_config(
|
||||
'django_celery_monitor', __file__,
|
||||
project='django_celery_monitor',
|
||||
version_dev='1.1.0',
|
||||
version_stable='1.0.1',
|
||||
version_dev='1.2.0',
|
||||
version_stable='1.1.2',
|
||||
canonical_url='http://django-celery-monitor.readthedocs.io',
|
||||
webdomain='',
|
||||
github_project='jezdez/django-celery-monitor',
|
||||
github_project='jazzband/django-celery-monitor',
|
||||
copyright='2009-2017',
|
||||
django_settings='proj.settings',
|
||||
include_intersphinx={'python', 'sphinx', 'django', 'celery'},
|
||||
|
|
|
|||
|
|
@ -1,11 +0,0 @@
|
|||
=======================================
|
||||
``django_celery_monitor.admin_utils``
|
||||
=======================================
|
||||
|
||||
.. contents::
|
||||
:local:
|
||||
.. currentmodule:: django_celery_monitor.admin_utils
|
||||
|
||||
.. automodule:: django_celery_monitor.admin_utils
|
||||
:members:
|
||||
:undoc-members:
|
||||
|
|
@ -10,7 +10,6 @@
|
|||
.. toctree::
|
||||
:maxdepth: 1
|
||||
|
||||
django_celery_monitor.admin_utils
|
||||
django_celery_monitor.camera
|
||||
django_celery_monitor.humanize
|
||||
django_celery_monitor.managers
|
||||
|
|
|
|||
|
|
@ -1,2 +1,2 @@
|
|||
sphinx_celery>=1.1
|
||||
Django>=1.10
|
||||
Django>=1.10,<2.0
|
||||
|
|
|
|||
|
|
@ -4,8 +4,8 @@ flake8>=2.5.4
|
|||
flakeplus>=1.1
|
||||
tox>=2.3.1
|
||||
sphinx2rst>=1.0
|
||||
bumpversion
|
||||
pydocstyle
|
||||
docutils
|
||||
readme_renderer
|
||||
check-manifest
|
||||
bumpversion==0.5.3
|
||||
pydocstyle==2.0.0
|
||||
docutils==0.14
|
||||
readme-renderer==17.2
|
||||
check-manifest==0.36
|
||||
|
|
|
|||
|
|
@ -1,2 +1 @@
|
|||
pytest-cov
|
||||
codecov
|
||||
|
|
|
|||
|
|
@ -1 +0,0 @@
|
|||
django>=1.10,<1.11
|
||||
|
|
@ -1 +0,0 @@
|
|||
django>=1.11,<2
|
||||
|
|
@ -1 +0,0 @@
|
|||
django>=1.8,<1.9
|
||||
|
|
@ -1 +0,0 @@
|
|||
django>=1.9,<1.10
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
case>=1.3.1
|
||||
pytest>=3.0
|
||||
pytest-django
|
||||
pytz>dev
|
||||
pytz
|
||||
|
|
|
|||
|
|
@ -14,3 +14,7 @@ match-dir = [^migrations]
|
|||
|
||||
[wheel]
|
||||
universal = 1
|
||||
|
||||
[check-manifest]
|
||||
ignore =
|
||||
docs/_build*
|
||||
|
|
|
|||
2
setup.py
2
setup.py
|
|
@ -150,5 +150,5 @@ setuptools.setup(
|
|||
tests_require=reqs('test.txt'),
|
||||
cmdclass={'test': pytest},
|
||||
zip_safe=False,
|
||||
include_package_data=False,
|
||||
include_package_data=True,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import datetime, timedelta
|
||||
from itertools import count
|
||||
from time import time
|
||||
|
||||
|
|
@ -15,7 +15,7 @@ from django.test.utils import override_settings
|
|||
from django.utils import timezone
|
||||
|
||||
from django_celery_monitor import camera, models
|
||||
from django_celery_monitor.utils import make_aware, now
|
||||
from django_celery_monitor.utils import make_aware
|
||||
|
||||
|
||||
_ids = count(0)
|
||||
|
|
@ -28,8 +28,8 @@ def Event(*args, **kwargs):
|
|||
return _Event(*args, **kwargs)
|
||||
|
||||
|
||||
@pytest.mark.django_db()
|
||||
@pytest.mark.usefixtures('depends_on_current_app')
|
||||
@pytest.mark.django_db
|
||||
class test_Camera:
|
||||
Camera = camera.Camera
|
||||
|
||||
|
|
@ -62,18 +62,22 @@ class test_Camera:
|
|||
worker.event('heartbeat', t, t, {})
|
||||
self.state.workers[worker.hostname] = worker
|
||||
assert (
|
||||
self.cam.get_heartbeat(worker) ==
|
||||
make_aware(datetime.fromtimestamp(t3))
|
||||
self.cam.get_heartbeat(worker) == make_aware(
|
||||
datetime.fromtimestamp(t3)
|
||||
)
|
||||
)
|
||||
|
||||
def test_handle_worker(self):
|
||||
worker = Worker(hostname='fuzzie')
|
||||
worker.event('online', time(), time(), {})
|
||||
self.cam._last_worker_write.clear()
|
||||
old_last_update = timezone.now() - timedelta(hours=1)
|
||||
models.WorkerState.objects.all().update(last_update=old_last_update)
|
||||
|
||||
m = self.cam.handle_worker((worker.hostname, worker))
|
||||
assert m
|
||||
assert m.hostname
|
||||
assert m.last_heartbeat
|
||||
assert m.last_update != old_last_update
|
||||
assert m.is_alive()
|
||||
assert str(m) == str(m.hostname)
|
||||
assert repr(m)
|
||||
|
|
@ -90,7 +94,7 @@ class test_Camera:
|
|||
assert mt.name == task.name
|
||||
assert str(mt)
|
||||
assert repr(mt)
|
||||
mt.eta = now()
|
||||
mt.eta = timezone.now()
|
||||
assert 'eta' in str(mt)
|
||||
assert mt in models.TaskState.objects.active()
|
||||
|
||||
|
|
@ -137,24 +141,28 @@ class test_Camera:
|
|||
task.event('received', tstamp, tstamp, {})
|
||||
mt = self.cam.handle_task((task.uuid, task))
|
||||
assert (
|
||||
mt.tstamp ==
|
||||
datetime(2016, 6, 1, 15, 0, 0, tzinfo=timezone.utc)
|
||||
mt.tstamp == datetime(
|
||||
2016, 6, 1, 15, 0, 0, tzinfo=timezone.utc
|
||||
)
|
||||
)
|
||||
assert (
|
||||
mt.eta ==
|
||||
datetime(2016, 6, 1, 15, 16, 17, 654321, tzinfo=timezone.utc)
|
||||
mt.eta == datetime(
|
||||
2016, 6, 1, 15, 16, 17, 654321, tzinfo=timezone.utc
|
||||
)
|
||||
)
|
||||
assert (
|
||||
mt.expires ==
|
||||
datetime(2016, 7, 1, 12, 16, 17, 765432, tzinfo=timezone.utc)
|
||||
mt.expires == datetime(
|
||||
2016, 7, 1, 12, 16, 17, 765432, tzinfo=timezone.utc
|
||||
)
|
||||
)
|
||||
|
||||
task = self.create_task(worker, eta='2016-06-04T15:16:17.654321')
|
||||
task.event('received', tstamp, tstamp, {})
|
||||
mt = self.cam.handle_task((task.uuid, task))
|
||||
assert (
|
||||
mt.eta ==
|
||||
datetime(2016, 6, 4, 15, 16, 17, 654321, tzinfo=timezone.utc)
|
||||
mt.eta == datetime(
|
||||
2016, 6, 4, 15, 16, 17, 654321, tzinfo=timezone.utc
|
||||
)
|
||||
)
|
||||
|
||||
with override_settings(USE_TZ=False, TIME_ZONE='Europe/Helsinki'):
|
||||
|
|
@ -237,7 +245,10 @@ class test_Camera:
|
|||
hostname=ws[1]),
|
||||
Event('worker-offline', hostname=ws[0])]
|
||||
list(map(state.event, events))
|
||||
cam._last_worker_write.clear()
|
||||
# reset the date the last update was done
|
||||
models.WorkerState.objects.all().update(
|
||||
last_update=timezone.now() - timedelta(hours=1)
|
||||
)
|
||||
cam.on_shutter(state)
|
||||
|
||||
w1 = models.WorkerState.objects.get(hostname=ws[0])
|
||||
|
|
|
|||
20
tox.ini
20
tox.ini
|
|
@ -1,7 +1,7 @@
|
|||
[tox]
|
||||
envlist =
|
||||
py{py,27,34,35}-dj{18,19,110}
|
||||
py36-dj111
|
||||
tests-py{py,27,35}-dj111
|
||||
tests-py36-dj111
|
||||
apicheck
|
||||
builddocs
|
||||
flake8
|
||||
|
|
@ -10,12 +10,11 @@ envlist =
|
|||
pydocstyle
|
||||
readme
|
||||
|
||||
[travis]
|
||||
[gh-actions]
|
||||
python =
|
||||
2.7: py27, apicheck, builddocs, flake8, flakeplus, linkcheck, pydocstyle
|
||||
|
||||
[travis:after]
|
||||
travis = python: 3.5
|
||||
2.7: py27, flake8, flakeplus, pydocstyle
|
||||
3.5: py35
|
||||
3.6: py36, apicheck, builddocs, linkcheck
|
||||
|
||||
[testenv]
|
||||
sitepackages = False
|
||||
|
|
@ -24,16 +23,13 @@ deps=
|
|||
-r{toxinidir}/requirements/test.txt
|
||||
-r{toxinidir}/requirements/test-ci.txt
|
||||
|
||||
dj18: -r{toxinidir}/requirements/test-django18.txt
|
||||
dj19: -r{toxinidir}/requirements/test-django19.txt
|
||||
dj110: -r{toxinidir}/requirements/test-django110.txt
|
||||
dj111: -r{toxinidir}/requirements/test-django111.txt
|
||||
dj111: django>=1.11,<2
|
||||
|
||||
apicheck,builddocs,linkcheck: -r{toxinidir}/requirements/docs.txt
|
||||
flake8,flakeplus,manifest,pydocstyle,readme: -r{toxinidir}/requirements/pkgutils.txt
|
||||
|
||||
commands =
|
||||
py: py.test -xv --cov=django_celery_monitor --cov-report=term --cov-report=xml --no-cov-on-fail
|
||||
tests: pytest -xv --cov=django_celery_monitor --cov-report=term --cov-report=xml --no-cov-on-fail []
|
||||
apicheck: sphinx-build -W -b apicheck -d {envtmpdir}/doctrees docs docs/_build/apicheck
|
||||
builddocs: sphinx-build -b html -d {envtmpdir}/doctrees docs {envtmpdir}/html
|
||||
flake8: flake8 {toxinidir}/django_celery_monitor {toxinidir}/tests
|
||||
|
|
|
|||
Loading…
Reference in a new issue