Async race conditions (Django, RQ)
Solving the case of the missing objects
tl;dr Beware of race conditions when using async queues and accessing data which may not have been committed on the main process.
Whenever we have to update an external system (pushing out data to out CRM system, notifications to HipChat, sending emails, that sort of thing) we send the data asynchronously by queueing up a function with Django-RQ.
If we need access to a Django object in the function, we pass the object id into the queued function, so that when the job is picked up by the queue worker process it always gets the latest data before processing it.
Over recent months, we’ve had a number of occasions where we’ve seen RQ jobs fail owing to not being able to retrieve Django objects that we've previously saved:
As an example - we create a new object (from the process running on the main web dyno), and put a job in a Redis queue to push that object out over the wire to our CRM system. The job in the queue has the id of the newly created object as its argument - so we know that the object exists - it has an id.
When the Redis job runs, the first thing it does it retrieve the object from the database, but then fails with a Django Model.DoesNotExist error. The job then gets pushed onto a failed queue. We retry the failure, and it goes through fine, no error.
From our support ticket to Heroku
The error is a standard Django Model.DoesNotExist
error - meaning that it can’t find the object specified by the ID that we pass to the function. Which is confusing, since the only reason we have an ID in the first place is that we have already saved the object. Otherwise it would be be None
, right?
Yes, and no. It comes down to transaction management, and a race condition between the two parallel processes - committing the transaction, and looking up the data:
It appears to me that the job to process an object is enqueued before the transaction that creates the object is committed. This is a classic race condition: if the worker happens to be running fast enough, it will attempt to read the object before the create is committed, and find nothing. If the worker is running a tiny bit slower, the commit will happen first, and the read will find it.
Heroku's response (h/t Owen)
If we consider the synchronous function that generates and stores the ID:
- Begin new transaction
- Save object, generate new ID
- End transaction, commit changes (including the new ID)
In between steps 2 and 3, the ID is only visible to the current transaction. And so if we push the ID onto a queue which is then read by another process, outside of that transaction, they won’t be able to see the object until step 3 is complete and the transaction is committed.
If this explanation is valid, it means that in between calling save()
and the end of the transaction (which is the end of the outermost method decorated with @transaction.atomic
) there is enough of a time gap for the message to pushed onto the remote RQ queue, and for the second process to read that message off the queue and attempt to look up the object ID in the database. Since we are on Heroku, and our Redis server is a network hop away, it looks like we must be doing a lot of work in the gap.
Looking at the code, we were writing a further three database records (totalling 4 writes across three tables within the transaction), as well as pushing another message onto the queue (for sending emails):
@transaction.atomic
def foo():
# Django Model.save() creates a new object ID
id = create_the_object_of_type_A()
# add a job to Redis with the new ID, which has
# yet to be committed to the database
add_job_onto_queue(id)
# worker process now has visibility of the job and
# we are in a race condition - we need to finish the
# method so that the transaction can be committed.
# all of these function calls involve database or
# network operations - and are potentially slow.
create_an_object_of_type_B()
create_another_object_of_type_B()
create_an_object_of_type_C()
look_up_an_object()
add_another_job_to_the_queue()
return
The complex solution to this is to take over explicit transaction management from Django, and to ensure that the ID updated is committed before we do anything with it. Doing this is opening yourself up to a world of pain - as having the framework manage transactions is part of their value.
The blunt solution is to move the problematic enqueue call further down in the function - thereby reducing the gap between the queueing and the end of the transaction to the minimum possible:
@transaction.atomic
def foo():
# Django Model.save() creates a new object ID
id = create_the_object()
# all of these function calls involve database or
# or network operations - potentially slow.
create_an_object_of_type_B()
create_another_object_of_type_B()
create_a_third_object_of_type_C()
look_up_an_object()
add_another_job_to_the_queue()
# putting the job onto the queue now still opens us
# up to a race condition, but with a much smaller
# window for conflict
add_job_onto_queue(id)
return
The million dollar question is “Does it work”? We’ve only implemented this fix today, so we have yet to see how this has affected our failed jobs queue, but anecdotally we had two failures in the hour before the deployment, and none in the three hours since, so fingers-crossed.
UPDATE: no failures yet, am declaring this experiment a success.