LLM前端应用技术选型
- 工程开发:
- 服务端:主要负责持续接收相关内容,并将其发送至前端。
- 前端:则持续接收来自服务端的内容,并进行渲染展示。
- 持续性接收的实现方案:
- 内容呈现为一段一段地输出文本,在 HTTP 范式下,存在两种常见方案,即 Server - Sent Events(服务器发送事件 )和 Websocket。
SSE接入的具体落地有几种方案
1. EventSource 实现接入
EventSource 是一个 Web API,是浏览器提供的原生对象,专门用于处理 SSE。使用简单,对 SSE 协议有良好的支持,自动处理连接的保持、重连等机制。
EventSource 基于 HTTP 协议实现,通过与服务器建立一个持续连接,实现了服务器向客户端推送事件数据的功能。
在客户端,EventSource 对象通过一个 URL 发起与服务器的连接。连接成功后,服务器可以向客户端发送事件数据。在客户端,通过 EventSource 对象注册事件处理函数,以接收来自服务器的事件数据。
const eventSource = new EventSource('http://localhost:3000');
eventSource.addEventListener('message', (event) => {
console.log('收到消息:', event.data);
document.getElementById('messages').innerHTML += `<p>${event.data}</p>`;
});
eventSource.onopen = () => console.log('连接已打开');
eventSource.onerror = () => {
console.error('连接中断,将自动重连');
};
EventSource API 是用于服务器发送事件(SSE)的客户端接口。它不支持发起 POST 请求。如果需要发送数据到服务器以便它能够推送事件给客户端,应该使用标准的 AJAX 请求(例如使用 Fetch API、XMLHttpRequest API)或者WebSocket。
2. XMLHttpRequest 实现接入
虽然 XMLHttpRequest 对象通常用于发送 HTTP 请求并接收响应,但它并不是专门设计用来处理 Server-Sent Events (SSE) 的。SSE 是一种基于 HTTP 的实时推送技术,可以使用 XMLHttpRequest 来模拟 SSE 的行为。
function startSSE() {
// 创建 XMLHttpRequest 对象
const xhr = new XMLHttpRequest();
// 配置请求
xhr.open('POST', 'http://localhost:3000/sse', true);
xhr.setRequestHeader('Content-Type', 'application/json');
// 发送请求数据(可以发送一些初始数据)
const requestData = { clientId: '12345', message: 'Hello, server!' };
xhr.send(JSON.stringify(requestData));
// 监听服务器推送的数据
xhr.onreadystatechange = () => {
if (xhr.readyState === XMLHttpRequest.LOADING) {
// 处理流式数据
const responseText = xhr.responseText;
const messages = responseText.split('\n\n'); // SSE 数据以两个换行符分隔
messages.forEach((message) => {
if (message.startsWith('data:')) {
const data = message.slice(5).trim(); // 去掉 "data:" 前缀
console.log('Received:', data);
}
});
}
};
// 监听错误
xhr.onerror = () => {
console.error('Error occurred during SSE connection');
};
// 监听连接关闭
xhr.onloadend = () => {
console.log('SSE connection closed');
};
}
// 启动 SSE
startSSE();
3. Fetch 实现接入
Fetch与XMLHttpRequest实现同理,来模拟 SSE 的行为。
下面是简单的demo:
async function downloadFile() {
try {
const response = await fetch('https://example.com/large-file');
const reader = response.body.getReader();
let receivedLength = 0;
const totalLength = response.headers.get('content - length');
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
receivedLength += value.length;
const progress = (receivedLength / totalLength * 100).toFixed(2);
console.log(`Download progress: ${progress}%`);
}
} catch (error) {
console.error('Error:', error);
}
}
最终方案
由于需要进行post请求,而没有基于XMLHttpRequest封装的类库,于是使用基于fetch封装的类库,微软开源的 @
microsoft/fetch-event-source请求库,该库基于fetch进行了封装。在此基础上,我对其进行了二次封装,主要目的是增强请求的可控性,并提升错误处理的健壮性。具体封装功能如下:
- 请求取消机制:实现了对请求的主动取消功能,确保在需要时能够及时中断请求,避免不必要的资源消耗。
- 错误捕获与处理:对各类错误进行了统一的捕获与处理,涵盖以下场景:
- 网络错误:当网络连接异常或请求无法发送时,捕获并处理相关错误。
- 服务器错误:针对服务器返回的5xx或4xx状态码进行错误处理。
- 网络连接错误:在网络连接中断或请求超时时,进行相应的错误捕获。
- 流式接口超时错误:在请求流式数据的过程中,若出现超时情况,能够及时捕获并处理。
- JSON解析错误:对返回的数据进行JSON解析时,若解析失败,能够捕获并处理异常。
通过上述封装,显著提升了请求的可靠性与稳定性,确保在各种异常情况下能够进行有效的错误处理与恢复。
安装:
npm i --save @lesliechueng/stream-fetch-manage
使用示例:
import { StreamFetchClient } from '@lesliechueng/stream-fetch-manage'
const streamFetchApp = new StreamFetchClient(
{
baseUrl: "",
headers: {
"Content-Type": "application/json",
},
overErrorTimer: 60 * 1000, // 流式中间超时时间,单位为毫秒
},
{
onMessage: (_data) => {
// 处理流式消息
},
onClose: (_lastData: any) => {
// 处理关闭时的回调
},
onServerError: (_lastData: any) => {
// 处理服务器错误时的回调
},
onStreamConnectionError: (_lastData: any) => {
// 处理流式中间超时错误时的回调
},
onConnectionError: (_lastData: any) => {
// 处理连接错误时的回调
},
onParseError: (_lastData: any) => {
// 处理 JSON解析错误时的回调
},
}
);
// 开始发起请求,下面是具体的参数
streamFetchApp.sendStreamRequest({
// 流式中间请求参数
});
// 暂停请求
// streamFetchApp.disconnect();
- 解决网络抖动问题:通过在请求中设置缓存时间和 data个数。
import { StreamFetchClient } from '@lesliechueng/stream-fetch-manage'
const streamFetchApp1 = new StreamFetchClient(
{
baseUrl: "",
headers: {
"Content-Type": "application/json",
},
overErrorTimer: 60 * 1000, // 流式中间超时时间,单位为毫秒
},
{
onMessage: (_data) => {
// 调用消息处理器处理消息
console.log(_data);
},
onClose: () => {
// 处理关闭时的回调
},
onServerError: () => {
// 处理服务器错误时的回调
},
onStreamConnectionError: () => {
// 处理流式中间超时错误时的回调
},
onConnectionError: () => {
// 处理连接错误时的回调
},
onParseError: () => {
// 处理 JSON解析错误时的回调
},
},
{
maxCacheSize: 6, // 最大缓存大小,单位为条
cacheTimeout: 5000, // 缓存超时时间,单位为毫秒
expectedSeq: 0, // 期望的消息索引值
handleValidateMessageFormat: (data: any) => {
// 校验消息格式的回调
if (typeof data.seq !== "number") {
throw new Error("Message must have a numeric seq field");
}
},
getIndexValue: (data: any) => data.seq, // 使得消息处理器获取消息索引值
}
);
// 开始发起请求,下面是具体的参数
streamFetchApp1.sendStreamRequest({
// 流式中间请求参数
});