基于Redis优先级队列的异步任务编排
利用Django + Redis实现支持优先级、延迟执行和任务依赖的异步队列。 · 难度:入门 · +10XP
优先级任务编排引擎
相比Celery,本教程直接从零搭建轻量级异步队列。使用Redis的sorted set实现优先级调度(高优先级任务先执行),以及list结构实现延迟任务(基于TTL)。更高级的功能是任务依赖图:一个任务必须等待其前置任务完成后才能执行。你将编写一个QueueManager类,用于push、pop任务,并配合Django management command作为worker循环处理。
import redis
from django.conf import settings
import json
class PriorityQueue:
def __init__(self):
self.redis = redis.Redis(connection_pool=settings.REDIS_POOL)
def push(self, task_data, priority=5, delay=0):
task_id = task_data.get('id')
score = priority
if delay > 0:
score = time.time() + delay
self.redis.zadd('task_queue', {json.dumps(task_data): score})
def pop(self):
tasks = self.redis.zpopmin('task_queue', 1)
if tasks:
return json.loads(tasks[0][0])
return None