Django + Strawberry GraphQL Subscription 与 ASGI 长连接模式
使用 Strawberry GraphQL 实现 Subscription 订阅,基于 ASGI 的 Channel Layer 推送实时数据,并处理连接生命周期。 · 难度:入门 · +10XP
Django + Strawberry GraphQL Subscription 与 ASGI 长连接模式
多数教程只讲 Query 和 Mutation。本课程深度讲解如何基于 Strawberry 的 GraphQL Subscription 类型,结合 Django Channels 的 Layer 实现 pub/sub。你将编写一个自定义的 AsyncGenerator,监听 Redis channel 并在数据更新时推送至客户端。同时处理 WebSocket 断线重连与心跳包,以及在 subscription 解析器中执行数据库查询的最佳实践。
import strawberry
from strawberry.types import Info
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
@strawberry.type
class Subscription:
@strawberry.subscription
async def message_sent(self, info: Info, room: str) -> str:
channel_layer = get_channel_layer()
async with channel_layer.
group_add(room, 'some_channel'):
# 实际监听循环
while True:
msg = await channel_layer.receive('some_channel')
yield msg['text']