Django: Fixing race condition when queuing with on_commit hook

Kamal Mustafa - Apr 12 '18 - - Dev Community

This has been explained in Celery docs.

And we bumped into this issue after rolling out django-q into production recently. Let say we have a function like this:-

@transaction.atomic
def send_msg(recipient, text, msgid=None, queue=False):
    msg = Message.objects.get_or_create(msgid=msgid)
    if queue:
        async('send_msg', recipient, text, msg.msgid, queue=False)
        msg.status = 'QUEUED'
        msg.save()
        return msg

    # pass msg to backend gateway
    send_to_gateway(recipient, msg, ....)

Once we had the qcluster running, we start seeing "Duplicate entry error" in the log. Look like when we call async(), the cluster picked it up before the transaction get committed, so get_or_create() there will create new object with the same msgid instead of getting an existing one. So we fix that using transaction.on_commit() in django 1.9.

@transaction.atomic
def send_msg(recipient, text, msgid=None, queue=False):
    msg = Message.objects.get_or_create(msgid=msgid)
    if queue:
        def _add_to_queue():
            async('send_msg', recipient, text, msg.msgid, queue=False)

        msg.status = 'QUEUED'
        msg.save()
        transaction.on_commit(_add_to_queue)
        return msg

    # pass msg to backend gateway
    send_to_gateway(recipient, msg, ....)

We use SQS but it doesn't matter what broker you use. It's depend whether your task depend on something in db. In our case above, the flow basically:-

Web request

  1. User send message
  2. We create a record in the db to represent the sent message with unique id
  3. async() the function to send the message, with the message id as one of the arguments.
  4. commit the transaction.

Queue worker

  1. The send message function invoked from qcluster
  2. Try to get the message by message id or create new one if doesn't exists.
  3. Send the message and update status as SENT.
  4. Commit transaction.

This issue happened between no. 3 in web request and no. 2 in the queue worker. Basically the rule of thumb if you're using transaction, async should be called from the outside of the transaction, not inside.


Originally posted at issue on Github.

. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Terabox Video Player