个人技术分享

一、前言

    使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

    总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

    基础入门:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(一),本篇学习如何集成LangChain进行模型交互


二、术语

2.1.FastAPI

    FastAPI 是一个用于构建 API 的现代、快速(高性能)的 Python Web 框架。它是基于标准 Python 类型注释的 ASGI (Asynchronous Server Gateway Interface) 框架。

FastAPI 具有以下主要特点:

  1. 快速: FastAPI 使用 ASGI 服务器和 Starlette 框架,在性能测试中表现出色。它可以与 Uvicorn 一起使用,提供非常高的性能。

  2. 简单: FastAPI 利用 Python 类型注释,使 API 定义变得简单且直观。开发人员只需要定义输入和输出模型,FastAPI 会自动生成 API 文档。

  3. 现代: FastAPI 支持 OpenAPI 标准,可以自动生成 API 文档和交互式文档。它还支持 JSON Schema 和数据验证。

  4. 全功能: FastAPI 提供了路由、依赖注入、数据验证、安全性、测试等功能,是一个功能齐全的 Web 框架。

  5. 可扩展: FastAPI 被设计为可扩展的。开发人员可以轻松地集成其他库和组件,如数据库、身份验证等。

2.2.WebSocket

    是一种计算机通信协议,它提供了在单个 TCP 连接上进行全双工通信的机制。它是 HTML5 一个重要的组成部分。

WebSocket 协议主要有以下特点:

  1. 全双工通信:WebSocket 允许客户端和服务器之间进行双向实时通信,即数据可以同时在两个方向上流动。这与传统的 HTTP 请求-响应模型不同,HTTP 中数据只能单向流动。

  2. 持久性连接:WebSocket 连接是一种持久性的连接,一旦建立就会一直保持,直到客户端或服务器主动关闭连接。这与 HTTP 的连接是短暂的不同。

  3. 低开销:相比 HTTP 请求-响应模型,WebSocket 在建立连接时需要较少的数据交换,因此网络开销较小。

  4. 实时性:由于 WebSocket 连接是持久性的,且数据可以双向流动,因此 WebSocket 非常适用于需要实时、低延迟数据交互的应用场景,如聊天应用、实时游戏、股票行情等。

2.3.LangChain

    是一个全方位的、基于大语言模型这种预测能力的应用开发工具。LangChain的预构建链功能,就像乐高积木一样,无论你是新手还是经验丰富的开发者,都可以选择适合自己的部分快速构建项目。对于希望进行更深入工作的开发者,LangChain 提供的模块化组件则允许你根据自己的需求定制和创建应用中的功能链条。

    LangChain本质上就是对各种大模型提供的API的套壳,是为了方便我们使用这些 API,搭建起来的一些框架、模块和接口。

   LangChain的主要特性:
        1.可以连接多种数据源,比如网页链接、本地PDF文件、向量数据库等
        2.允许语言模型与其环境交互
        3.封装了Model I/O(输入/输出)、Retrieval(检索器)、Memory(记忆)、Agents(决策和调度)等核心组件
        4.可以使用链的方式组装这些组件,以便最好地完成特定用例。
        5.围绕以上设计原则,LangChain解决了现在开发人工智能应用的一些切实痛点。

2.4.流式输出

    是模型推理过程中逐步生成输出结果,而非一次性生成整个输出,从而实现更低的延迟和更好的实时性。


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加LangChain的依赖包

conda create -n fastapi_test python=3.10
conda activate fastapi_test
pip install fastapi websockets uvicorn
pip install --quiet  langchain-core langchain-community langchain-openai

四、技术实现

4.1. 集成LangChain

    本章代码将在开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(二)基础上进行拓展

服务端:

import uvicorn
import os

from typing import Annotated
from fastapi import (
    Depends,
    FastAPI,
    WebSocket,
    WebSocketException,
    WebSocketDisconnect,
    status,
)

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'    #你的Open AI Key

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

app = FastAPI()

async def authenticate(
    websocket: WebSocket,
    userid: str,
    secret: str,
):
    if userid is None or secret is None:
        raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)

    print(f'userid: {userid},secret: {secret}')
    if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:
        return 'pass'
    else:
        return 'fail'

async def chat(query):
    template = "Question: {query}"
    prompt = ChatPromptTemplate.from_template(template)
    model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7, max_tokens=1024)
    chain = prompt | model
    async for response in chain.astream(query):
        print(response.content, flush=True)
        yield response.content

@app.websocket("/ws")
async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):
    await manager.connect(websocket)
    try:
        while True:
            text = await websocket.receive_text()

            if 'fail' == permission:
                await manager.send_personal_message(
                    f"authentication failed", websocket
                )
            else:
                if text is not None and len(text) > 0:
                    async for msg in chat(text):
                        await manager.send_personal_message(msg, websocket)

    except WebSocketDisconnect:
        manager.disconnect(websocket)
        print(f"Client #{userid} left the chat")
        await manager.broadcast(f"Client #{userid} left the chat")

if __name__ == '__main__':
    uvicorn.run(app, host='0.0.0.0',port=7777)

客户端:

<!DOCTYPE html>
<html>
    <head>
        <title>Chat</title>
    </head>
    <body>
        <h1>WebSocket Chat</h1>
        <form action="" onsubmit="sendMessage(event)">
            <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label>
            <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>
            <br/>
            <button onclick="connect(event)">Connect</button>
            <hr>
            <label>Message: <input type="text" id="messageText" autocomplete="off"/></label>
            <button>Send</button>
        </form>
        <ul id='messages'>
        </ul>
        <script>
            var ws = null;
            function connect(event) {
                var userid = document.getElementById("userid")
                var secret = document.getElementById("secret")
                ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);
                ws.onmessage = function(event) {
                    var messages = document.getElementById('messages')
                    var message = document.createElement('li')
                    var content = document.createTextNode(event.data)
                    message.appendChild(content)
                    messages.appendChild(message)
                };
                event.preventDefault()
            }
            function sendMessage(event) {
                var input = document.getElementById("messageText")
                ws.send(input.value)
                input.value = ''
                event.preventDefault()
            }
        </script>
    </body>
</html>

调用结果:

用户输入:你好

模型输出:你好!有什么我可以帮忙的吗?

用户输入:广州有什么好玩的地方?

模型输出:广州有很多好玩的地方,比如:1.珠江夜游:可以乘船游览珠江两岸的灯光景观,欣赏广州夜景。2.越秀公园:是广州最大的市区中心公园,有美丽的湖泊和花园,适合休闲散步。3.广州塔:是广州的标志性建筑,可以乘坐观光电梯俯瞰整个城市。4.白云山:是广州市内的一座著名山峰,可以登山欣赏壮丽的风景。5.越秀山:是广州的另一座著名山峰,登山可以俯瞰整个城市。6.荔枝湾:是广州著名的美食街,可以品尝各种地道的广州美食。7.广州博物馆:展示了广州的历史和文化,是了解广州的好去处。希望以上信息能帮助到您,祝您在广州玩得愉快!

PS:

1. 在AI交互中,LangChain框架并不是必须引入,此处引用仅用于简化Openai的交互流程。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。