Running rq in sync mode
A run of pickling problems with our tests has required us to delve into the rq and django-rq source code. If you run rq in synchronous mode, queued functions are run directly by the enqueue method, but redis is still used to store 'job' information.
We use the redis -> rq -> django-rq chain to run all our async processes, and a recent update led me to change our code to take advantage of the async=False mode, whereby jobs are run synchronously, bypassing worker processes altogether. By doing this we run all the jobs through rq which makes things a bit more consistent.
This code:
# queue is always async - a worker process is required to process jobs
queue = django_rq.get_queue()
def do_something():
    # settings flag used to determine whether to queue jobs
    if settings.QUEUE_FUNCTION:
        queue.enqueue(_do_the_real_thing())
    else:
        _do_the_real_thing()
def _do_the_real_thing():
    pass
becomes:
# use settings flag once on queue creation
queue = django_rq.get_queue(async=settings.QUEUE_FUNCTION)
def do_something():
    # this is the visible function
    queue.enqueue(_do_the_real_thing())
def _do_the_real_thing():
    # this is where the work gets done
    pass
It was a relatively simple change, but unfortunately it unleashed a world of pain when all of our tests (that involved rq) started failing. The cause of the errors was  pickling problem caused by our use of mock dates to simulate various date-dependent scenarios. Getting to the bottom of this involved digging into the rq source to find out exactly what happens when async=False (which is how our tests run).
The two classes used when running in sync mode are job and queue. When you call the queue.enqueue() method, a job is created that contains the function call and any function args / kwargs that you pass in. In normal (async) mode, this job structure is pickled and stored in redis for subsequent processing by a worker process. When running in sync mode, a job is still created, but instead of saving the job for later consumption, the job function is invokedimmediately (comments are mine where indicated):
def enqueue_job(self, job, timeout=None, set_meta_data=True):
    """Enqueues a job for delayed execution.
    When the `timeout` argument is sent, it will overrides the default
    timeout value of 180 seconds.  `timeout` may either be a string or
    integer.
    If the `set_meta_data` argument is `True` (default), it will update
    the properties `origin` and `enqueued_at`.
    If Queue is instantiated with async=False, job is executed immediately.
    """
    if set_meta_data:
        job.origin = self.name
        job.enqueued_at = times.now()
    if timeout:
        job.timeout = timeout  # _timeout_in_seconds(timeout)
    else:
        job.timeout = 180  # default
    # (HRB) at this point the job._result attribute is None, and so
    # it is not pickled in the save() method.
    job.save()
    if self._async:
        self.push_job_id(job.id)
    else:
        # (HRB) this is where your queued function is called if running
        # in async=False mode.
        job.perform()
        # (HRB) at this point, the job._result will contain the output of
        # your function call, which means that the save() method will 
        # attempt to pickle it. If there is anything in the function that
        # causes pickling issues (e.g. a mocked out date), this will fail,
        # but the function *will* have been called.
        job.save()
    return job
The key (for our failures) was the nature of the job.save() method, and its interaction with redis / pickling:
def save(self, pipeline=None):
    """Persists the current job instance to its corresponding Redis key."""
    key = self.key
    connection = pipeline if pipeline is not None else self.connection
    obj = {}
    obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
    if self.func_name is not None:
        obj['data'] = dumps(self.job_tuple)
    if self.origin is not None:
        obj['origin'] = self.origin
    if self.description is not None:
        obj['description'] = self.description
    if self.enqueued_at is not None:
        obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC')
    if self.ended_at is not None:
        obj['ended_at'] = times.format(self.ended_at, 'UTC')
    if self._result is not None:
        # (HRB) after job.perform is called, this will contain data
        obj['result'] = dumps(self._result)
    if self.exc_info is not None:
        obj['exc_info'] = self.exc_info
    if self.timeout is not None:
        obj['timeout'] = self.timeout
    if self.result_ttl is not None:
        obj['result_ttl'] = self.result_ttl
    if self._status is not None:
        obj['status'] = self._status
    if self.meta:
        obj['meta'] = dumps(self.meta)
    connection.hmset(key, obj)
The important take-away from this is that even when running in sync mode, rq will use redis (and pickling) even though the job itself is run inline. In addition, if there is anything in your queued function result that causes a pickling issue (obj['result'] = dumps(self._result) above), you should be aware that the function will already have been called, which may result in inconsistent data / side-effects.
Making Freelance Work
