Use select_for_update to update TaskState and WorkerState models.

This also replaces a in-process cache for the last time the worker update
was done to a column in the worker_state table called last_update.
This commit is contained in:
Jannis Leidel 2017-05-10 10:48:24 +02:00
parent 93fbe8421c
commit 04131fecde
6 changed files with 157 additions and 47 deletions

View file

@ -1,13 +1,10 @@
"""The Celery events camera."""
from __future__ import absolute_import, unicode_literals
from collections import defaultdict
from datetime import timedelta
from celery import states
from celery.events.state import Task
from celery.events.snapshot import Polaroid
from celery.five import monotonic
from celery.utils.imports import symbol_by_name
from celery.utils.log import get_logger
from celery.utils.time import maybe_iso8601
@ -31,7 +28,6 @@ class Camera(Polaroid):
def __init__(self, *args, **kwargs):
super(Camera, self).__init__(*args, **kwargs)
self._last_worker_write = defaultdict(lambda: (None, None))
# Expiry can be timedelta or None for never expire.
self.app.add_defaults({
'monitors_expire_success': timedelta(days=1),
@ -74,16 +70,13 @@ class Camera(Polaroid):
return fromtimestamp(heartbeat)
def handle_worker(self, hostname_worker):
(hostname, worker) = hostname_worker
last_write, obj = self._last_worker_write[hostname]
if (not last_write or
monotonic() - last_write > self.worker_update_freq):
obj, _ = self.WorkerState.objects.update_or_create(
hostname=hostname,
defaults={'last_heartbeat': self.get_heartbeat(worker)},
)
self._last_worker_write[hostname] = (monotonic(), obj)
return obj
hostname, worker = hostname_worker
# was there an update in the last n seconds?
return self.WorkerState.objects.update_heartbeat(
hostname,
heartbeat=self.get_heartbeat(worker),
update_freq=self.worker_update_freq,
)
def handle_task(self, uuid_task, worker=None):
"""Handle snapshotted event."""
@ -113,29 +106,17 @@ class Camera(Polaroid):
if defaults[attr] is None]
return self.update_task(task.state, task_id=uuid, defaults=defaults)
def update_task(self, state, **kwargs):
objects = self.TaskState.objects
defaults = kwargs.pop('defaults', None) or {}
def update_task(self, state, task_id, defaults=None):
defaults = defaults or {}
if not defaults.get('name'):
return
obj, created = objects.get_or_create(defaults=defaults, **kwargs)
if created:
return obj
else:
if states.state(state) < states.state(obj.state):
keep = Task.merge_rules[states.RECEIVED]
defaults = dict(
(k, v) for k, v in defaults.items()
if k not in keep
)
return self.TaskState.objects.update_state(
state=state,
task_id=task_id,
defaults=defaults,
)
for k, v in defaults.items():
setattr(obj, k, v)
obj.save()
return obj
def on_shutter(self, state, commit_every=100):
def on_shutter(self, state):
def _handle_tasks():
for i, task in enumerate(state.tasks.items()):
@ -146,8 +127,10 @@ class Camera(Polaroid):
_handle_tasks()
def on_cleanup(self):
expired = (self.TaskState.objects.expire_by_states(states, expires)
for states, expires in self.expire_task_states)
expired = (
self.TaskState.objects.expire_by_states(states, expires)
for states, expires in self.expire_task_states
)
dirty = sum(item for item in expired if item is not None)
if dirty:
debug('Cleanup: Marked %s objects as dirty.', dirty)

View file

@ -0,0 +1,22 @@
from django.db.models import DateTimeField, Func
try:
from django.db.models.functions import Now
except ImportError:
class Now(Func):
"""A backport of the Now function from Django 1.9.x."""
template = 'CURRENT_TIMESTAMP'
def __init__(self, output_field=None, **extra):
if output_field is None:
output_field = DateTimeField()
super(Now, self).__init__(output_field=output_field, **extra)
def as_postgresql(self, compiler, connection):
# Postgres' CURRENT_TIMESTAMP means "the time at the start of the
# transaction". We use STATEMENT_TIMESTAMP to be cross-compatible
# with other databases.
self.template = 'STATEMENT_TIMESTAMP()'
return self.as_sql(compiler, connection)

View file

@ -1,19 +1,72 @@
"""The model managers."""
from __future__ import absolute_import, unicode_literals
from datetime import timedelta
from celery import states
from celery.events.state import Task
from celery.utils.time import maybe_timedelta
from django.db import connections, models, router, transaction
from django.utils import timezone
from .compat import Now
class TaskStateManager(models.Manager):
"""A custom models manager for the TaskState model with some helpers."""
class ExtendedQuerySet(models.QuerySet):
"""A custom model manager that implements a few helpful methods."""
def connection_for_write(self):
"""Return the database connection that is configured for writing."""
return connections[router.db_for_write(self.model)]
def select_for_update_or_create(self, defaults=None, **kwargs):
"""
Look up an object with the given kwargs, updating one with defaults
if it exists, otherwise create a new one.
Return a tuple (object, created), where created is a boolean
specifying whether an object was created.
This is a backport from Django 1.11
(https://code.djangoproject.com/ticket/26804) to support
select_for_update when getting the object.
"""
defaults = defaults or {}
lookup, params = self._extract_model_params(defaults, **kwargs)
self._for_write = True
with transaction.atomic(using=self.db):
try:
obj = self.select_for_update().get(**lookup)
except self.model.DoesNotExist:
obj, created = self._create_object_from_params(lookup, params)
if created:
return obj, created
for k, v in defaults.items():
setattr(obj, k, v() if callable(v) else v)
obj.save(using=self.db)
return obj, False
class WorkerStateQuerySet(ExtendedQuerySet):
"""A custom model manager for the WorkerState model with some helpers."""
def update_heartbeat(self, hostname, heartbeat, update_freq):
with transaction.atomic():
interval = Now() - timedelta(seconds=update_freq)
recent_worker_updates = self.filter(
hostname=hostname,
last_update__gte=interval,
)
if recent_worker_updates.exists():
obj = recent_worker_updates.latest()
else:
obj, _ = self.select_for_update_or_create(
hostname=hostname,
defaults={'last_heartbeat': heartbeat},
)
return obj
class TaskStateQuerySet(ExtendedQuerySet):
"""A custom model manager for the TaskState model with some helpers."""
def active(self):
"""Return all active task states."""
return self.filter(hidden=False)
@ -22,7 +75,7 @@ class TaskStateManager(models.Manager):
"""Return all expired task states."""
return self.filter(
state__in=states,
tstamp__lte=timezone.now() - maybe_timedelta(expires),
tstamp__lte=Now() - maybe_timedelta(expires),
)
def expire_by_states(self, states, expires):
@ -39,3 +92,23 @@ class TaskStateManager(models.Manager):
'DELETE FROM {0.db_table} WHERE hidden=%s'.format(meta),
(True, ),
)
def update_state(self, state, task_id, defaults):
with transaction.atomic():
obj, created = self.select_for_update_or_create(
task_id=task_id,
defaults=defaults,
)
if created:
return obj
else:
if states.state(state) < states.state(obj.state):
keep = Task.merge_rules[states.RECEIVED]
defaults = dict(
(k, v) for k, v in defaults.items()
if k not in keep
)
for k, v in defaults.items():
setattr(obj, k, v)
obj.save()
return obj

View file

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('celery_monitor', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='workerstate',
name='last_update',
field=models.DateTimeField(
default=django.utils.timezone.now,
auto_now=True,
verbose_name='last update',
),
preserve_default=False,
),
]

View file

@ -26,10 +26,11 @@ class WorkerState(models.Model):
#: A :class:`~datetime.datetime` describing when the worker was last seen.
last_heartbeat = models.DateTimeField(_('last heartbeat'), null=True,
db_index=True)
last_update = models.DateTimeField(_('last update'), auto_now=True)
#: A :class:`~django.db.models.Manager` instance
#: A :class:`~django_celery_monitor.managers.ExtendedManager` instance
#: to query the :class:`~django_celery_monitor.models.WorkerState` model.
objects = models.Manager()
objects = managers.WorkerStateQuerySet.as_manager()
class Meta:
"""Model meta-data."""
@ -106,7 +107,7 @@ class TaskState(models.Model):
#: A :class:`~django_celery_monitor.managers.TaskStateManager` instance
#: to query the :class:`~django_celery_monitor.models.TaskState` model.
objects = managers.TaskStateManager()
objects = managers.TaskStateQuerySet.as_manager()
class Meta:
"""Model meta-data."""

View file

@ -1,6 +1,6 @@
from __future__ import absolute_import, unicode_literals
from datetime import datetime
from datetime import datetime, timedelta
from itertools import count
from time import time
@ -69,11 +69,14 @@ class test_Camera:
def test_handle_worker(self):
worker = Worker(hostname='fuzzie')
worker.event('online', time(), time(), {})
self.cam._last_worker_write.clear()
old_last_update = timezone.now() - timedelta(hours=1)
models.WorkerState.objects.all().update(last_update=old_last_update)
m = self.cam.handle_worker((worker.hostname, worker))
assert m
assert m.hostname
assert m.last_heartbeat
assert m.last_update != old_last_update
assert m.is_alive()
assert str(m) == str(m.hostname)
assert repr(m)
@ -237,7 +240,10 @@ class test_Camera:
hostname=ws[1]),
Event('worker-offline', hostname=ws[0])]
list(map(state.event, events))
cam._last_worker_write.clear()
# reset the date the last update was done
models.WorkerState.objects.all().update(
last_update=timezone.now() - timedelta(hours=1)
)
cam.on_shutter(state)
w1 = models.WorkerState.objects.get(hostname=ws[0])