FastAPI的曲折之路
FastAPI支持多种传输数据的方式,其中有http响应,以及websocket等等…最开始,我使用的是http模式,直接@app.get 来定义FastAPI接口。我发现,这种模式要求的是,前端发送消息,后端收取消息并响应。我在定义的时候,发现了这种模式在我项目中的局限性。
首先,我的主程序和FastAPI不好解耦,程序逻辑都是写在定义api的语句中,让我后续扩展维护程序带来相当多的麻烦。其次,我无法实现程序与客户端的持续性互动,因为http模式的输入-输出,导致我要固定住处理数据的逻辑,并且只有前端发来数据时,我才能返回数据。
尽管,我找到了一些改进的方法,就是使用FastAPI的依赖注入功能(简单来说,就是API可以定义在一个文件中,然后另一个文件可以通过特定语句导入它,实现api和业务的分离)。但是,前端发送信息,后端就要响应信息这一底层逻辑没有变,使得前后端仍然 “藕断丝连”
新思路的探索
随即,我查阅到了FastAPI的另一种连接模式:websocket。websocket是一种双向通信模式,前端和后端可以通过websocket,维持长连接。相比于http模式,websocket的长连接更符合我的程序需求。这是,我想到,如果使用内存作为API和主程序的桥梁呢?websocket维持长连接,而内存则负责向我的主程序共享数据。这样的话,我的业务逻辑就能和API解耦,同时,我的整个项目的前后端又能通过API来解耦。
进一步的实现
话不多说,上代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| from fastapi import APIRouter, WebSocket, WebSocketDisconnect, FastAPI import aioredis from typing import List from contextlib import asynccontextmanager
@asynccontextmanager async def lifespan(app: FastAPI): global redis_pool try: redis_pool = await aioredis.from_url("redis://localhost", max_connections=10) yield finally: if redis_pool: await redis_pool.close()
app = FastAPI(lifespan=lifespan)
active_connections: List[WebSocket] = []
redis_pool = None
async def get_redis(): if redis_pool is None: raise RuntimeError("Redis connection pool is not initialized") return redis_pool
@app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): redis = await get_redis() await websocket.accept() active_connections.append(websocket) try: file_data = b"" while True: data = await websocket.receive_bytes() if data == b"file_complete": await websocket.send_text("File transfer complete") await redis.set("raw_file", file_data) await websocket.send_text("Ready for next task") pubsub = redis.pubsub() await pubsub.subscribe("processed_data_ready") async for message in pubsub.listen(): if message["type"] == "message": processed_data = await redis.get("processed_file") await websocket.send_text(f"Processed data: {processed_data.decode('utf-8')}") break
file_data = b"" else: file_data += data await websocket.send_text("File chunk received")
except WebSocketDisconnect: active_connections.remove(websocket) await websocket.close()
finally: if websocket in active_connections: active_connections.remove(websocket)
|
我在这个文件中,启动了redis服务,并且定义了API。之后运行这个文件,API接受到的数据就会向内存里传输,同时我的主程序就可以读取redis 内存库中的数据,并将这个数据进行处理,处理后发送回redis。API代码逻辑出读取到主程序文件返回的数据,再将数据发送回去