浮头导航网

专注编程技术分享的开发者社区

通过spring-boot自带的SseEmitter实现AI会话

spring-boot-sse-demo

通过spring-boot自带的SseEmitter实现AI会话。后端异步请求大模型接口,通过SSE协议推送到前端。前端通过EventSourced流式接收消息并渲染实现打字机效果。

代码要点

POM依赖

 <dependencys>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
 </dependencys>

后端代码

  • 接口配置RequestMapping的produces要声明text/event-stream
  • 接口返回值为SseEmitter
  • emitter响应的消息数据的格式最好是json格式,只有json格式才能支持复杂消息返回,不然假如消息有换行会和sse协议的换行冲突
 @Controller
 @RequestMapping("/ssedemo")
 public class SseDemoController {
     @RequestMapping(value = "/chatstream",produces = "text/event-stream;charset=utf-8")
     public SseEmitter chatstream(@RequestParam Map param) {
         SseEmitter emitter = new SseEmitter(60_000L);
         new Thread(() -> {
             Map<String, String> userMessage = new HashMap<>();
             userMessage.put("role", "user");
             userMessage.put("content", param.get("q").toString());
             
             Map<String,Object> data=new HashMap<>();
             data.put("model", "Qwen/Qwen3-8B");
             data.put("stream",true);
             data.put("messages", List.of(userMessage));
             
             String CHART_URL="https://api.siliconflow.cn/v1/chat/completions";
             InputStream bodyStream = HttpRequest.post(CHART_URL)
                     .contentType("application/json")
                     .header("Accept", "text/event-stream")
                     .header("Authorization", "Bearer " + "sk-")
                     .body(JSONUtil.toJsonStr(data))
                     .execute(true).bodyStream();
             AtomicLong count = new AtomicLong();
             try (BufferedReader br = new BufferedReader(new InputStreamReader(bodyStream))) {
                 String line;
                 while ((line = br.readLine()) != null) {
                     log.info("响应: {}", line);
                     //消息结束
                     if(line.equals("data: [DONE]")){
                         emitter.send(SseEmitter.event()
                                 .id("ssedemo-"+count.incrementAndGet()).data(
                                         JSONUtil.toJsonStr(new AIRequest.Result(true, false, "[DONE]", null,"content"))));
                         emitter.complete();
                         return;
                     }
                     //SSE通过\n\n区分事件消息,所以空行过滤掉,只接收JSON消息
                     if(!line.startsWith("data:")){
                         continue;
                     }
                     JSONObject jsonObject = JSONUtil.parseObj(line.substring(6));
                     JSONObject choices = jsonObject.getJSONArray("choices").getJSONObject(0);
                     JSONObject delta = choices.getJSONObject("delta");
                     String content = delta.getStr("content");
                     String reasoning_content = delta.getStr("reasoning_content");
                     if(StrUtil.isNotEmpty(reasoning_content)) {
                         emitter.send(SseEmitter.event()
                                 .id("ssedemo-"+count.incrementAndGet()).data(
                                         JSONUtil.toJsonStr(new AIRequest.Result(false, false, reasoning_content, null,"reasoning"))));
                     }
                     if(StrUtil.isNotEmpty(content)) {
                         emitter.send(SseEmitter.event()
                                 .id("ssedemo-"+count.incrementAndGet()).data(
                                         JSONUtil.toJsonStr(new AIRequest.Result(false, false, content, null,"content"))));
                     }
                 }
             } catch (IOException e) {
                 log.error("流式请求失败", e);
                 try {
                     emitter.send(SseEmitter.event()
                             .id("ssedemo-"+count.incrementAndGet()).data(
                                     JSONUtil.toJsonStr(new AIRequest.Result(false, true, null, e.getMessage(),"content"))));
                     emitter.complete();
                 } catch (IOException ex) {
                 }
             }
             
         }).start();
         emitter.onCompletion(() -> System.out.println("onCompletion"));
         return emitter;
     }
 }

前端代码

  • 重点在EventSource在onerror时要close掉,不能自动重连
 function sendMessage() {
         const input = document.getElementById('messageInput');
         const message = input.value.trim();
 
         if (message) {
             const chatMessages = document.getElementById('chatMessages');
 
             // 添加用户消息
             const userMessage = document.createElement('div');
             userMessage.className = 'message user-message';
             userMessage.textContent = message;
             chatMessages.appendChild(userMessage);
 
             // 清空输入框
             input.value = '';
 
             // 滚动到底部
             chatMessages.scrollTop = chatMessages.scrollHeight;
             // 显示"正在输入"指示器
             var typingIndicator = document.getElementById('typingIndicator');
             if(!typingIndicator){
                 typingIndicator=clonedElement.cloneNode(true);
                 chatMessages.appendChild(clonedElement);
             }else{
                 chatMessages.removeChild(typingIndicator);
                 chatMessages.appendChild(typingIndicator);
             }
             typingIndicator.style.display = 'flex';
 
             if(eventSource) eventSource.close();
             eventSource = new EventSource('/ssedemo/chatstream?q='+message);
             var messageSpan=undefined;
             var isconstent=false;
             eventSource.onmessage = function (event) {
                 const data = JSON.parse(event.data);
                 if(data.type=='reasoning'){
                     return;
                 }
                 console.log(data);
                 //第一次 接收到消息时,隐藏"正在输入"指示器
                 if(typingIndicator.style.display == 'flex'){
                     typingIndicator.style.display = 'none';
                     // 创建AI消息元素
                     const aiMessage = document.createElement('div');
                     aiMessage.className = 'message ai-message';
 
                     messageSpan = document.createElement('span');
                     aiMessage.appendChild(messageSpan);
 
                     chatMessages.appendChild(aiMessage);
 
                     // 滚动到底部
                     chatMessages.scrollTop = chatMessages.scrollHeight;
                 }
                 if (data.content === '[DONE]') {
                     console.log('[DONE]');
                     return;
                 }
                 if(data.type=='content'&&!isconstent){
                     isconstent=true;
                     messageSpan.innerHTML += "\n";
                 }
                 messageSpan.innerHTML += data.content;
                 // 滚动到底部
                 chatMessages.scrollTop = chatMessages.scrollHeight;
             };
             eventSource.onerror = function (error) {
                 console.error('Error:', error);
                 eventSource.close();
             };
         }
     }



控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言