异步生成器与管道:构建流式数据处理管线
利用 async for 和异步生成器实现背压感知的数据管线,组合多个处理阶段,处理无限数据流而不耗尽内存,并支持优雅取消。 · 难度:入门 · +10XP
异步生成器与管道:构建流式数据处理管线
在数据处理中,常常需要从网络或磁盘流式读取数据,经过多个变换后输出。本教程教你使用 async def 和 yield 定义异步生成器,然后通过 asyncio 的 Queue 或直接嵌套 async for 来连接这些生成器形成管线。你将实现一个支持背压的日志处理系统,其中每个处理阶段可以独立控制并发度,并且整个管线可以通过 asyncio.CancelledError 优雅关闭。
async def read_lines(reader):
while True:
line = await reader.readline()
if not line:
break
yield line.decode().strip()
async def filter_keyword(lines, keyword):
async for line in lines:
if keyword in line:
yield line
async def main():
reader = await asyncio.open_connection('localhost', 9999)
lines = read_lines(reader[0])
error_lines = filter_keyword(lines, 'ERROR')
async for err in error_lines:
print(err)