Running rq in sync mode
A run of pickling problems with our tests has required us to delve into the rq and django-rq source code. If you run rq in synchronous mode, queued functions are run directly by the enqueue
method, but redis is still used to store 'job' information.
We use the redis -> rq -> django-rq chain to run all our async processes, and a recent update led me to change our code to take advantage of the async=False
mode, whereby jobs are run synchronously, bypassing worker processes altogether. By doing this we run all the jobs through rq which makes things a bit more consistent.
This code:
# queue is always async - a worker process is required to process jobs
queue = django_rq.get_queue()
def do_something():
# settings flag used to determine whether to queue jobs
if settings.QUEUE_FUNCTION:
queue.enqueue(_do_the_real_thing())
else:
_do_the_real_thing()
def _do_the_real_thing():
pass
becomes:
# use settings flag once on queue creation
queue = django_rq.get_queue(async=settings.QUEUE_FUNCTION)
def do_something():
# this is the visible function
queue.enqueue(_do_the_real_thing())
def _do_the_real_thing():
# this is where the work gets done
pass
It was a relatively simple change, but unfortunately it unleashed a world of pain when all of our tests (that involved rq) started failing. The cause of the errors was pickling problem caused by our use of mock dates to simulate various date-dependent scenarios. Getting to the bottom of this involved digging into the rq source to find out exactly what happens when async=False
(which is how our tests run).
The two classes used when running in sync mode are job
and queue
. When you call the queue.enqueue()
method, a job
is created that contains the function call and any function args / kwargs that you pass in. In normal (async) mode, this job structure is pickled and stored in redis for subsequent processing by a worker process. When running in sync mode, a job is still created, but instead of saving the job for later consumption, the job function is invokedimmediately (comments are mine where indicated):
def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution.
When the `timeout` argument is sent, it will overrides the default
timeout value of 180 seconds. `timeout` may either be a string or
integer.
If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`.
If Queue is instantiated with async=False, job is executed immediately.
"""
if set_meta_data:
job.origin = self.name
job.enqueued_at = times.now()
if timeout:
job.timeout = timeout # _timeout_in_seconds(timeout)
else:
job.timeout = 180 # default
# (HRB) at this point the job._result attribute is None, and so
# it is not pickled in the save() method.
job.save()
if self._async:
self.push_job_id(job.id)
else:
# (HRB) this is where your queued function is called if running
# in async=False mode.
job.perform()
# (HRB) at this point, the job._result will contain the output of
# your function call, which means that the save() method will
# attempt to pickle it. If there is anything in the function that
# causes pickling issues (e.g. a mocked out date), this will fail,
# but the function *will* have been called.
job.save()
return job
The key (for our failures) was the nature of the job.save()
method, and its interaction with redis / pickling:
def save(self, pipeline=None):
"""Persists the current job instance to its corresponding Redis key."""
key = self.key
connection = pipeline if pipeline is not None else self.connection
obj = {}
obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
if self.func_name is not None:
obj['data'] = dumps(self.job_tuple)
if self.origin is not None:
obj['origin'] = self.origin
if self.description is not None:
obj['description'] = self.description
if self.enqueued_at is not None:
obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
if self.ended_at is not None:
obj['ended_at'] = times.format(self.ended_at, 'UTC')
if self._result is not None:
# (HRB) after job.perform is called, this will contain data
obj['result'] = dumps(self._result)
if self.exc_info is not None:
obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl
if self._status is not None:
obj['status'] = self._status
if self.meta:
obj['meta'] = dumps(self.meta)
connection.hmset(key, obj)
The important take-away from this is that even when running in sync mode, rq will use redis (and pickling) even though the job itself is run inline. In addition, if there is anything in your queued function result that causes a pickling issue (obj['result'] = dumps(self._result)
above), you should be aware that the function will already have been called, which may result in inconsistent data / side-effects.
Making Freelance Work