用PostgreSQL SKIP LOCKED实现轻量级任务队列
不依赖Celery或RabbitMQ,仅利用Django ORM + PostgreSQL的SKIP LOCKED特性,构建一个可靠、可监控的数据库级任务队列,适用于中小型项目。 · 难度:入门 · +10XP
SKIP LOCKED解决竞争
传统数据库轮询任务会导致重复执行与锁竞争。PostgreSQL 9.5+提供的SELECT ... FOR UPDATE SKIP LOCKED可以原子地获取并锁定下一个待执行任务,其他worker自动跳过已锁行。本教程将展示如何用Django ORM的raw()或自定义QuerySet实现此逻辑,并添加任务重试、超时、死信队列等功能,所有元数据都存储在模型字段中,无需额外中间件。
# tasks/utils.py
from django.db import transaction
from .models import Task
def acquire_task():
with transaction.atomic():
task = Task.objects.filter(status='pending')\
.select_for_update(skip_locked=True)\
.order_by('created_at').first()
if task:
task.status = 'running'
task.save(update_fields=['status'])
return task