crush-level-web/src/utils/streamParser.ts

77 lines
1.9 KiB
TypeScript

export type SSEHandler = (event: string, data: string) => void;
export const parseData = (data: string): any => {
try {
return JSON.parse(data);
} catch (error) {
return data;
}
};
/**
* 处理 SSE 流的通用函数
* @param response fetch 返回的 Response 对象
* @param onMessage 收到消息时的回调函数
*/
export async function parseSSEStream(response: Response, onMessage: SSEHandler) {
if (!response.body) {
throw new Error('Response body is empty');
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
// 1. 解码新收到的数据并追加到 buffer
if (value) {
buffer += decoder.decode(value, { stream: !done });
}
// 2. 按标准 SSE 分隔符 \n\n 切分消息
const parts = buffer.split('\n\n');
// 3. 最后一个部分通常是不完整的,留到下一次处理
// 但如果流已经结束(done=true),那么剩下的所有内容都必须强制处理
buffer = parts.pop() || '';
if (done && buffer.trim()) {
parts.push(buffer);
buffer = '';
}
// 4. 解析切分出来的每一条完整消息
for (const part of parts) {
if (!part.trim()) continue;
const lines = part.split('\n');
let event = '';
let data = '';
for (const line of lines) {
if (line.startsWith('event:')) {
event = line.slice(6).trim();
} else if (line.startsWith('data:')) {
const lineData = line.slice(5);
data = lineData;
}
}
if (data) {
onMessage(event, data);
}
}
if (done) break;
}
} catch (error) {
console.error('Stream parsing error:', error);
throw error; // 继续抛出,让调用者处理
} finally {
reader.releaseLock();
}
}