网站设计师要求,石家庄招聘网最新招聘,企业平台网站制作,营销型网站建设优势背景 接上一篇《LLM大模型统一封装接口解决方案》架构确定后#xff0c;流式方案非常规请求#xff0c;需要特殊处理。 本解决方案就是针对上一篇中所需要的流式#xff08;打字机效果进行编码#xff09; 什么是SSE SSE#xff08;Server-Sent Events#xff0c;服务器发…背景 接上一篇《LLM大模型统一封装接口解决方案》架构确定后流式方案非常规请求需要特殊处理。 本解决方案就是针对上一篇中所需要的流式打字机效果进行编码 什么是SSE SSEServer-Sent Events服务器发送事件是一种基于HTTP的服务器到客户端的单向通信技术用于实现服务器向客户端推送数据的功能。SSE协议标准由HTML5规范定义并且其定义被包含在HTML Living Standard中。 SSE允许服务器通过HTTP连接向客户端发送数据而无需客户端发起请求。这使得SSE非常适合于实时通信或推送通知给客户端的应用程序例如实时股票报价、即时通讯、实时监控等场景。 基本上SSE由以下要素组成 服务器负责向客户端发送事件流的HTTP服务器。客户端通过浏览器中的EventSource API与服务器建立连接接收服务器发送的事件。事件流Event Stream服务器向客户端发送的数据流格式为纯文本使用一种特定的格式进行编码例如MIME类型为text/event-stream。 SSE的优点包括简单易用、实现方便、跨浏览器支持良好等。然而它也有一些限制例如不能支持双向通信与WebSocket相比SSE的实时性稍逊一筹。 Java框架说明 pom 文件引入的核心依赖包 ?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.0/versionrelativePath/ !-- lookup parent from repository --/parentgroupIdaip.com/groupIdartifactIdaip-com/artifactIdversion0.0.1/versionnameaip-com/namedescriptionaip com project for Spring Boot/descriptionpropertiesjava.version1.8/java.version/propertiesdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-webflux/artifactId/dependencydependencygroupIdio.reactivex.rxjava2/groupIdartifactIdrxjava/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project
Java后端核心代码 本方法是标准的SSE协议标准 private final ExecutorService executorService Executors.newFixedThreadPool(5);/*** 会话请求** return String*/PostMapping(value /completions, consumes MediaType.APPLICATION_JSON_VALUE)Operation(summary 会话请求)public SseEmitter completions(RequestBody CompletionRequest completionRequest) {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter new SseEmitter();executorService.execute(() - {try {for (int i 0; i 10; i) {// 向客户端发送事件emitter.send(SseEmitter.event().name(message).data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;/*** 会话请求** return String*/GetMapping(value /stream)Operation(summary 会话请求)public SseEmitter stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);SseEmitter emitter new SseEmitter();executorService.execute(() - {try {for (int i 0; i 10; i) {// 向客户端发送事件emitter.send(SseEmitter.event().name(message).data(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build())));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}});return emitter;Flux 和 Flowable 对比 Flux 和 Flowable 都是响应式编程库中的数据流类型用于处理异步和基于事件的流式数据。它们分别来自于不同的库Flux 是 Reactor 库的一部分而 Flowable 则是 RxJava 库的一部分。以下是它们之间的一些区别 库的来源 Flux 来自于 Reactor 库是 Reactor 的核心组件之一React的核心模块用于基于反应式流规范处理数据流。Flowable 来自于 RxJava 库是 RxJava 的核心类之一RxJava 是 Java 平台的反应式扩展库用于处理异步和基于事件的编程。 背压策略 Flux 默认采用背压策略为 BUFFER可以通过 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等方法来指定不同的背压策略。Flowable 默认也是支持背压的但是相比 FluxFlowable 提供了更多的背压策略如 BUFFER、DROP、LATEST、ERROR、MISSING。 反应式规范 Flux 遵循 Reactor 库的反应式流规范使用 Mono 和 Flux 来表示异步流和单个结果。Flowable 遵循 RxJava 库的反应式流规范使用 Observable 和 Flowable 来表示异步流和单个结果。 生态系统 Reactor 生态系统主要用于基于 Reactor 的应用程序。RxJava 生态系统则更广泛它是 ReactiveX 的一部分支持多种语言和平台并有许多衍生项目。 总的来说Flux 和 Flowable 在概念上很相似都用于处理异步和基于事件的流式数据但它们来自于不同的库并且有一些细微的区别如背压策略和生态系统支持。您可以根据项目需求选择适合的库和数据流类型。 Java后端Flowable方式 本方法是Flowable方式非标准流式规则 /*** 会话请求** return String*/GetMapping(value /stream)Operation(summary 会话请求)public FlowableString stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);FlowableString typingFlow Flowable.create(emitter - {executorService.execute(() - {try {for (int i 0; i 10; i) {emitter.onNext(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.onComplete();} catch (Exception e) {}});}, BackpressureStrategy.BUFFER);return typingFlow;}Java后端Flux方式 本方法是Flux方式非标准流式规则 /*** 会话请求** return String*/GetMapping(value /stream)Operation(summary 会话请求)public FluxString stream() {response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);FluxString typingFlow Flux.create(emitter - {executorService.execute(() - {try {for (int i 0; i 10; i) {emitter.next(JsonHelper.toJSONString(new StreamCompletionResult.Builder().ended(false).message(String.valueOf(i)).build()));Thread.sleep(1000);}emitter.complete();} catch (Exception e) {}});}, FluxSink.OverflowStrategy.BUFFER);return typingFlow;}
}HTML 客户端接收示例程序 function EventSourceGetRequest() SSE 默认方法只支持GET请求适合演示用途以及后端包装好服务 function fetchPostRequest() fetch POST 请求实现SSE支持所有请求(POST,GET等)以及传递参数 sse.html 内容 !DOCTYPE html
html langen
headmeta charsetUTF-8titleSEE Example/titlescript// SSE 默认方法只支持GET请求function EventSourceGetRequest() {if(typeof(EventSource)!undefined){var eventSource new EventSource(http://127.0.0.1:8090/v1/chat/stream);eventSource.onmessage function(event){document.getElementById(result).insertAdjacentHTML(beforeend, ${event.data}br/br/);console.log(event)};}else{document.getElementById(result).innerHTML抱歉你的浏览器不支持 server-sent 事件...;}}// fetch POST 请求实现SSEfunction fetchPostRequest() {fetch(http://127.0.0.1:8090/v1/chat/completions, {method: POST,headers: {Content-Type: application/json},body: JSON.stringify({}),}).then(response {// 检查响应是否成功if (!response.ok) {throw new Error(Network response was not ok);}// 返回 ReadableStream 对象return response.body;}).then(stream {// 创建一个新的文本解码器const decoder new TextDecoder();// 获取一个 reader 对象const reader stream.getReader();let chunk // 逐块读取数据function read() {reader.read().then(({ done, value }) {if (done) {document.getElementById(result).insertAdjacentHTML(beforeend, ${chunk}hr/);console.log(Stream has ended);return;}// 将数据块转换为字符串并显示const tmp decoder.decode(value, { stream: true });if (tmp.startsWith(event:) chunk!) {document.getElementById(result).insertAdjacentHTML(beforeend, ${chunk}hr/);chunk tmp}else{chunk chunk tmp}// 继续读取下一块数据read();});}// 开始读取数据read();}).catch(error {// 处理错误console.error(There was a problem with the fetch operation:, error);});}// EventSourceGetRequest();fetchPostRequest();/script
/head
bodyh1SEE result/h1div idresult/div
/body
/html标准SSE示例 扩展SSE