7.1 流式处理基础
7.1.1 流式处理概念
流式处理(Streaming)是现代AI应用的重要特性,它允许AI模型逐步生成响应内容,而不是等待完整响应后一次性返回。这种方式可以显著改善用户体验,特别是在生成长文本时。
核心优势: 1. 实时反馈:用户可以立即看到AI开始生成内容 2. 降低延迟:减少首字节时间(TTFB) 3. 改善体验:类似打字机效果,更自然的交互 4. 资源优化:减少内存占用和网络缓冲
7.1.2 Spring AI流式架构
// StreamingChatModel接口
public interface StreamingChatModel extends ChatModel {
/**
* 流式聊天调用
*/
Flux<ChatResponse> stream(Prompt prompt);
/**
* 流式聊天调用(简化版)
*/
default Flux<String> streamContent(String message) {
return stream(new Prompt(message))
.map(response -> response.getResult().getOutput().getContent());
}
}
// 流式响应处理器
package com.example.springai.streaming;
import org.springframework.ai.chat.model.ChatResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import java.util.function.Consumer;
public class StreamingResponseHandler {
private final Sinks.Many<String> contentSink;
private final Sinks.Many<ChatResponse> responseSink;
public StreamingResponseHandler() {
this.contentSink = Sinks.many().multicast().onBackpressureBuffer();
this.responseSink = Sinks.many().multicast().onBackpressureBuffer();
}
/**
* 处理流式响应
*/
public void handleStreamingResponse(Flux<ChatResponse> responseStream) {
responseStream
.doOnNext(response -> {
// 发送完整响应
responseSink.tryEmitNext(response);
// 提取并发送内容
String content = response.getResult().getOutput().getContent();
if (content != null && !content.isEmpty()) {
contentSink.tryEmitNext(content);
}
})
.doOnError(error -> {
contentSink.tryEmitError(error);
responseSink.tryEmitError(error);
})
.doOnComplete(() -> {
contentSink.tryEmitComplete();
responseSink.tryEmitComplete();
})
.subscribe();
}
/**
* 获取内容流
*/
public Flux<String> getContentStream() {
return contentSink.asFlux();
}
/**
* 获取响应流
*/
public Flux<ChatResponse> getResponseStream() {
return responseSink.asFlux();
}
/**
* 添加内容监听器
*/
public void onContent(Consumer<String> contentConsumer) {
getContentStream().subscribe(contentConsumer);
}
/**
* 添加完成监听器
*/
public void onComplete(Runnable completeCallback) {
getContentStream()
.doOnComplete(completeCallback)
.subscribe();
}
/**
* 添加错误监听器
*/
public void onError(Consumer<Throwable> errorConsumer) {
getContentStream()
.doOnError(errorConsumer)
.subscribe();
}
}
7.2 流式聊天服务
7.2.1 基础流式服务
// StreamingChatService.java
package com.example.springai.service;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.chat.prompt.PromptTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@Service
public class StreamingChatService {
private final StreamingChatModel streamingChatModel;
public StreamingChatService(StreamingChatModel streamingChatModel) {
this.streamingChatModel = streamingChatModel;
}
/**
* 基础流式聊天
*/
public Flux<String> streamChat(String message) {
return streamingChatModel.streamContent(message)
.filter(content -> content != null && !content.trim().isEmpty());
}
/**
* 带模板的流式聊天
*/
public Flux<String> streamChatWithTemplate(String template, Map<String, Object> variables) {
PromptTemplate promptTemplate = new PromptTemplate(template);
Prompt prompt = promptTemplate.create(variables);
return streamingChatModel.stream(prompt)
.map(response -> response.getResult().getOutput().getContent())
.filter(content -> content != null && !content.trim().isEmpty());
}
/**
* 累积内容的流式聊天
*/
public Flux<AccumulatedResponse> streamChatWithAccumulation(String message) {
AtomicReference<StringBuilder> accumulator = new AtomicReference<>(new StringBuilder());
return streamChat(message)
.map(chunk -> {
StringBuilder sb = accumulator.get();
sb.append(chunk);
return new AccumulatedResponse(chunk, sb.toString());
});
}
/**
* 带超时的流式聊天
*/
public Flux<String> streamChatWithTimeout(String message, Duration timeout) {
return streamChat(message)
.timeout(timeout)
.onErrorResume(throwable ->
Flux.just("[超时] 响应时间过长,请稍后重试"));
}
/**
* 流式聊天完成后的处理
*/
public Mono<String> streamChatAndCollect(String message) {
return streamChat(message)
.collectList()
.map(chunks -> String.join("", chunks));
}
/**
* 多轮对话流式处理
*/
public Flux<ConversationResponse> streamConversation(ConversationRequest request) {
StringBuilder conversationHistory = new StringBuilder();
// 构建对话历史
request.history().forEach(turn -> {
conversationHistory.append("用户: ").append(turn.userMessage()).append("\n");
conversationHistory.append("助手: ").append(turn.assistantMessage()).append("\n");
});
// 添加当前用户消息
conversationHistory.append("用户: ").append(request.currentMessage()).append("\n");
conversationHistory.append("助手: ");
return streamChat(conversationHistory.toString())
.map(chunk -> new ConversationResponse(
request.conversationId(),
chunk,
System.currentTimeMillis()
));
}
/**
* 累积响应记录
*/
public record AccumulatedResponse(
String chunk,
String accumulated
) {}
/**
* 对话请求
*/
public record ConversationRequest(
String conversationId,
String currentMessage,
java.util.List<ConversationTurn> history
) {}
/**
* 对话轮次
*/
public record ConversationTurn(
String userMessage,
String assistantMessage
) {}
/**
* 对话响应
*/
public record ConversationResponse(
String conversationId,
String chunk,
long timestamp
) {}
}
7.2.2 高级流式处理
// AdvancedStreamingService.java
package com.example.springai.service;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
@Service
public class AdvancedStreamingService {
private final StreamingChatModel streamingChatModel;
public AdvancedStreamingService(StreamingChatModel streamingChatModel) {
this.streamingChatModel = streamingChatModel;
}
/**
* 带速率限制的流式处理
*/
public Flux<String> streamWithRateLimit(String message, Duration interval) {
return streamingChatModel.streamContent(message)
.delayElements(interval)
.filter(content -> content != null && !content.trim().isEmpty());
}
/**
* 分批处理的流式响应
*/
public Flux<BatchResponse> streamInBatches(String message, int batchSize) {
AtomicInteger counter = new AtomicInteger(0);
return streamingChatModel.streamContent(message)
.filter(content -> content != null && !content.trim().isEmpty())
.buffer(batchSize)
.map(batch -> new BatchResponse(
counter.incrementAndGet(),
batch,
String.join("", batch)
));
}
/**
* 带重试机制的流式处理
*/
public Flux<String> streamWithRetry(String message, int maxRetries) {
return streamingChatModel.streamContent(message)
.retry(maxRetries)
.onErrorResume(throwable ->
Flux.just("[错误] 处理失败: " + throwable.getMessage()));
}
/**
* 并行流式处理
*/
public Flux<ParallelResponse> streamParallel(java.util.List<String> messages) {
return Flux.fromIterable(messages)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(message ->
streamingChatModel.streamContent(message)
.collectList()
.map(chunks -> new ParallelResponse(
message,
String.join("", chunks),
Thread.currentThread().getName()
))
)
.sequential();
}
/**
* 流式处理监控
*/
public Flux<MonitoredResponse> streamWithMonitoring(String message) {
AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
AtomicInteger chunkCount = new AtomicInteger(0);
AtomicInteger totalLength = new AtomicInteger(0);
return streamingChatModel.streamContent(message)
.filter(content -> content != null && !content.trim().isEmpty())
.map(chunk -> {
int count = chunkCount.incrementAndGet();
int length = totalLength.addAndGet(chunk.length());
long elapsed = System.currentTimeMillis() - startTime.get();
return new MonitoredResponse(
chunk,
count,
length,
elapsed,
calculateThroughput(length, elapsed)
);
});
}
/**
* 条件流式处理
*/
public Flux<String> streamWithCondition(String message,
Function<String, Boolean> condition) {
return streamingChatModel.streamContent(message)
.filter(content -> content != null && !content.trim().isEmpty())
.takeWhile(condition::apply);
}
/**
* 流式处理转换
*/
public Flux<TransformedResponse> streamWithTransformation(String message,
Function<String, String> transformer) {
return streamingChatModel.streamContent(message)
.filter(content -> content != null && !content.trim().isEmpty())
.map(chunk -> new TransformedResponse(
chunk,
transformer.apply(chunk)
));
}
/**
* 计算吞吐量
*/
private double calculateThroughput(int totalLength, long elapsedMs) {
if (elapsedMs == 0) return 0.0;
return (double) totalLength / elapsedMs * 1000; // 字符/秒
}
// 响应记录类
public record BatchResponse(
int batchNumber,
java.util.List<String> chunks,
String content
) {}
public record ParallelResponse(
String originalMessage,
String response,
String threadName
) {}
public record MonitoredResponse(
String chunk,
int chunkCount,
int totalLength,
long elapsedMs,
double throughput
) {}
public record TransformedResponse(
String original,
String transformed
) {}
}
7.3 异步编程模式
7.3.1 异步聊天服务
// AsyncChatService.java
package com.example.springai.service;
import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class AsyncChatService {
private final ChatModel chatModel;
private final Executor asyncExecutor;
public AsyncChatService(ChatModel chatModel) {
this.chatModel = chatModel;
this.asyncExecutor = Executors.newFixedThreadPool(10);
}
/**
* 异步聊天
*/
@Async
public CompletableFuture<String> chatAsync(String message) {
return CompletableFuture.supplyAsync(() -> {
try {
ChatResponse response = chatModel.call(new Prompt(message));
return response.getResult().getOutput().getContent();
} catch (Exception e) {
throw new RuntimeException("异步聊天失败: " + e.getMessage(), e);
}
}, asyncExecutor);
}
/**
* 批量异步聊天
*/
public CompletableFuture<List<AsyncChatResult>> chatBatchAsync(List<String> messages) {
List<CompletableFuture<AsyncChatResult>> futures = messages.stream()
.map(message ->
chatAsync(message)
.thenApply(response -> new AsyncChatResult(message, response, true, null))
.exceptionally(throwable -> new AsyncChatResult(
message, null, false, throwable.getMessage()))
)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
/**
* 带超时的异步聊天
*/
public CompletableFuture<String> chatAsyncWithTimeout(String message, long timeoutMs) {
CompletableFuture<String> chatFuture = chatAsync(message);
CompletableFuture<String> timeoutFuture = new CompletableFuture<>();
// 设置超时
CompletableFuture.delayedExecutor(timeoutMs, java.util.concurrent.TimeUnit.MILLISECONDS)
.execute(() -> timeoutFuture.completeExceptionally(
new java.util.concurrent.TimeoutException("聊天请求超时")));
return chatFuture.applyToEither(timeoutFuture, Function.identity());
}
/**
* 异步聊天链
*/
public CompletableFuture<String> chatChainAsync(List<String> messages) {
CompletableFuture<String> result = CompletableFuture.completedFuture("");
for (String message : messages) {
result = result.thenCompose(previousResponse -> {
String combinedMessage = previousResponse.isEmpty() ?
message : previousResponse + "\n\n" + message;
return chatAsync(combinedMessage);
});
}
return result;
}
/**
* 条件异步聊天
*/
public CompletableFuture<String> chatAsyncWithCondition(
String message,
java.util.function.Predicate<String> condition) {
return chatAsync(message)
.thenCompose(response -> {
if (condition.test(response)) {
return CompletableFuture.completedFuture(response);
} else {
// 如果条件不满足,重新生成
return chatAsync(message + "\n请重新生成更合适的回答。");
}
});
}
/**
* 异步聊天结果合并
*/
public CompletableFuture<MergedChatResult> chatMergeAsync(List<String> messages) {
return chatBatchAsync(messages)
.thenApply(results -> {
List<String> successResponses = results.stream()
.filter(AsyncChatResult::success)
.map(AsyncChatResult::response)
.collect(Collectors.toList());
List<String> errors = results.stream()
.filter(result -> !result.success())
.map(AsyncChatResult::error)
.collect(Collectors.toList());
return new MergedChatResult(
String.join("\n\n", successResponses),
successResponses.size(),
errors.size(),
errors
);
});
}
// 结果记录类
public record AsyncChatResult(
String message,
String response,
boolean success,
String error
) {}
public record MergedChatResult(
String mergedResponse,
int successCount,
int errorCount,
List<String> errors
) {}
}
7.3.2 响应式编程集成
// ReactiveChatService.java
package com.example.springai.service;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@Service
public class ReactiveChatService {
private final StreamingChatModel streamingChatModel;
public ReactiveChatService(StreamingChatModel streamingChatModel) {
this.streamingChatModel = streamingChatModel;
}
/**
* 响应式聊天
*/
public Mono<String> chatReactive(String message) {
return Mono.fromCallable(() -> {
ChatResponse response = streamingChatModel.call(new Prompt(message));
return response.getResult().getOutput().getContent();
})
.subscribeOn(Schedulers.boundedElastic());
}
/**
* 响应式批量处理
*/
public Flux<ReactiveChatResult> chatBatchReactive(List<String> messages) {
return Flux.fromIterable(messages)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(message ->
chatReactive(message)
.map(response -> new ReactiveChatResult(message, response, true, null))
.onErrorReturn(throwable -> new ReactiveChatResult(
message, null, false, throwable.getMessage()))
)
.sequential();
}
/**
* 响应式流式聊天
*/
public Flux<String> chatStreamReactive(String message) {
return streamingChatModel.streamContent(message)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())
.filter(content -> content != null && !content.trim().isEmpty());
}
/**
* 带背压处理的流式聊天
*/
public Flux<String> chatStreamWithBackpressure(String message) {
return streamingChatModel.streamContent(message)
.onBackpressureBuffer(1000)
.filter(content -> content != null && !content.trim().isEmpty());
}
/**
* 响应式重试机制
*/
public Mono<String> chatReactiveWithRetry(String message) {
return chatReactive(message)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(10))
.filter(throwable -> !(throwable instanceof IllegalArgumentException)));
}
/**
* 响应式超时处理
*/
public Mono<String> chatReactiveWithTimeout(String message, Duration timeout) {
return chatReactive(message)
.timeout(timeout)
.onErrorReturn("请求超时,请稍后重试");
}
/**
* 响应式缓存
*/
public Mono<String> chatReactiveWithCache(String message) {
return chatReactive(message)
.cache(Duration.ofMinutes(5)); // 缓存5分钟
}
/**
* 响应式聚合
*/
public Mono<AggregatedResult> chatAggregateReactive(List<String> messages) {
AtomicInteger counter = new AtomicInteger(0);
return Flux.fromIterable(messages)
.flatMap(this::chatReactive)
.collectList()
.map(responses -> new AggregatedResult(
counter.incrementAndGet(),
responses,
String.join("\n\n", responses),
responses.size()
));
}
/**
* 响应式条件处理
*/
public Flux<String> chatConditionalReactive(String message,
java.util.function.Predicate<String> condition) {
return chatStreamReactive(message)
.takeWhile(condition)
.switchIfEmpty(Flux.just("没有满足条件的响应"));
}
/**
* 响应式错误恢复
*/
public Flux<String> chatStreamWithErrorRecovery(String message) {
return streamingChatModel.streamContent(message)
.onErrorResume(throwable -> {
if (throwable instanceof java.util.concurrent.TimeoutException) {
return Flux.just("[超时] 响应时间过长");
} else if (throwable instanceof IllegalArgumentException) {
return Flux.just("[参数错误] 请检查输入参数");
} else {
return Flux.just("[系统错误] " + throwable.getMessage());
}
});
}
// 结果记录类
public record ReactiveChatResult(
String message,
String response,
boolean success,
String error
) {}
public record AggregatedResult(
int id,
List<String> responses,
String combined,
int count
) {}
}
7.4 WebSocket流式集成
7.4.1 WebSocket配置
// WebSocketConfig.java
package com.example.springai.config;
import com.example.springai.websocket.StreamingChatWebSocketHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final StreamingChatWebSocketHandler streamingChatHandler;
public WebSocketConfig(StreamingChatWebSocketHandler streamingChatHandler) {
this.streamingChatHandler = streamingChatHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(streamingChatHandler, "/ws/streaming-chat")
.setAllowedOrigins("*"); // 生产环境中应该限制域名
}
}
7.4.2 WebSocket处理器
// StreamingChatWebSocketHandler.java
package com.example.springai.websocket;
import com.example.springai.service.StreamingChatService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import reactor.core.Disposable;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
@Component
public class StreamingChatWebSocketHandler implements WebSocketHandler {
private final StreamingChatService streamingChatService;
private final ObjectMapper objectMapper;
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final Map<String, Disposable> subscriptions = new ConcurrentHashMap<>();
public StreamingChatWebSocketHandler(StreamingChatService streamingChatService,
ObjectMapper objectMapper) {
this.streamingChatService = streamingChatService;
this.objectMapper = objectMapper;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String sessionId = session.getId();
sessions.put(sessionId, session);
// 发送连接确认
WebSocketMessage message = new WebSocketMessage(
"connection", "已连接到流式聊天服务", sessionId, System.currentTimeMillis());
sendMessage(session, message);
}
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message)
throws Exception {
if (message instanceof TextMessage textMessage) {
try {
ChatRequest request = objectMapper.readValue(
textMessage.getPayload(), ChatRequest.class);
handleChatRequest(session, request);
} catch (Exception e) {
sendError(session, "消息解析失败: " + e.getMessage());
}
}
}
private void handleChatRequest(WebSocketSession session, ChatRequest request) {
String sessionId = session.getId();
// 取消之前的订阅
Disposable previousSubscription = subscriptions.get(sessionId);
if (previousSubscription != null && !previousSubscription.isDisposed()) {
previousSubscription.dispose();
}
// 开始流式聊天
Disposable subscription = streamingChatService.streamChat(request.message())
.subscribe(
chunk -> {
WebSocketMessage response = new WebSocketMessage(
"chunk", chunk, sessionId, System.currentTimeMillis());
try {
sendMessage(session, response);
} catch (IOException e) {
// 处理发送错误
}
},
error -> {
sendError(session, "流式处理错误: " + error.getMessage());
},
() -> {
WebSocketMessage complete = new WebSocketMessage(
"complete", "流式响应完成", sessionId, System.currentTimeMillis());
try {
sendMessage(session, complete);
} catch (IOException e) {
// 处理发送错误
}
}
);
subscriptions.put(sessionId, subscription);
}
private void sendMessage(WebSocketSession session, WebSocketMessage message)
throws IOException {
if (session.isOpen()) {
String json = objectMapper.writeValueAsString(message);
session.sendMessage(new TextMessage(json));
}
}
private void sendError(WebSocketSession session, String error) {
try {
WebSocketMessage errorMessage = new WebSocketMessage(
"error", error, session.getId(), System.currentTimeMillis());
sendMessage(session, errorMessage);
} catch (IOException e) {
// 记录日志
}
}
@Override
public void handleTransportError(WebSocketSession session, Throwable exception)
throws Exception {
sendError(session, "传输错误: " + exception.getMessage());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus)
throws Exception {
String sessionId = session.getId();
// 清理资源
sessions.remove(sessionId);
Disposable subscription = subscriptions.remove(sessionId);
if (subscription != null && !subscription.isDisposed()) {
subscription.dispose();
}
}
@Override
public boolean supportsPartialMessages() {
return false;
}
// 消息记录类
public record ChatRequest(String message) {}
public record WebSocketMessage(
String type,
String content,
String sessionId,
long timestamp
) {}
}
7.5 流式控制器
7.5.1 REST流式接口
// StreamingController.java
package com.example.springai.controller;
import com.example.springai.service.StreamingChatService;
import com.example.springai.service.AdvancedStreamingService;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
@RestController
@RequestMapping("/api/streaming")
public class StreamingController {
private final StreamingChatService streamingChatService;
private final AdvancedStreamingService advancedStreamingService;
public StreamingController(StreamingChatService streamingChatService,
AdvancedStreamingService advancedStreamingService) {
this.streamingChatService = streamingChatService;
this.advancedStreamingService = advancedStreamingService;
}
/**
* 基础流式聊天
*/
@PostMapping(value = "/chat", produces = MediaType.TEXT_PLAIN_VALUE)
public Flux<String> streamChat(@RequestBody ChatRequest request) {
return streamingChatService.streamChat(request.message());
}
/**
* 服务器发送事件(SSE)流式聊天
*/
@PostMapping(value = "/chat/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatSSE(@RequestBody ChatRequest request) {
return streamingChatService.streamChat(request.message())
.map(chunk -> "data: " + chunk + "\n\n");
}
/**
* 累积响应流式聊天
*/
@PostMapping(value = "/chat/accumulated", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<StreamingChatService.AccumulatedResponse> streamChatAccumulated(
@RequestBody ChatRequest request) {
return streamingChatService.streamChatWithAccumulation(request.message());
}
/**
* 带超时的流式聊天
*/
@PostMapping(value = "/chat/timeout", produces = MediaType.TEXT_PLAIN_VALUE)
public Flux<String> streamChatWithTimeout(
@RequestBody ChatRequest request,
@RequestParam(defaultValue = "30") int timeoutSeconds) {
return streamingChatService.streamChatWithTimeout(
request.message(), Duration.ofSeconds(timeoutSeconds));
}
/**
* 分批流式处理
*/
@PostMapping(value = "/chat/batched", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<AdvancedStreamingService.BatchResponse> streamChatBatched(
@RequestBody ChatRequest request,
@RequestParam(defaultValue = "5") int batchSize) {
return advancedStreamingService.streamInBatches(request.message(), batchSize);
}
/**
* 带监控的流式处理
*/
@PostMapping(value = "/chat/monitored", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<AdvancedStreamingService.MonitoredResponse> streamChatMonitored(
@RequestBody ChatRequest request) {
return advancedStreamingService.streamWithMonitoring(request.message());
}
/**
* 多轮对话流式处理
*/
@PostMapping(value = "/conversation", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<StreamingChatService.ConversationResponse> streamConversation(
@RequestBody StreamingChatService.ConversationRequest request) {
return streamingChatService.streamConversation(request);
}
/**
* 并行流式处理
*/
@PostMapping(value = "/chat/parallel", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux<AdvancedStreamingService.ParallelResponse> streamChatParallel(
@RequestBody ParallelChatRequest request) {
return advancedStreamingService.streamParallel(request.messages());
}
/**
* 流式聊天并收集结果
*/
@PostMapping(value = "/chat/collect", produces = MediaType.APPLICATION_JSON_VALUE)
public Mono<CollectedResponse> streamChatAndCollect(@RequestBody ChatRequest request) {
return streamingChatService.streamChatAndCollect(request.message())
.map(content -> new CollectedResponse(content, content.length()));
}
// 请求和响应记录类
public record ChatRequest(String message) {}
public record ParallelChatRequest(List<String> messages) {}
public record CollectedResponse(String content, int length) {}
}
7.5.2 异步控制器
// AsyncController.java
package com.example.springai.controller;
import com.example.springai.service.AsyncChatService;
import com.example.springai.service.ReactiveChatService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/api/async")
public class AsyncController {
private final AsyncChatService asyncChatService;
private final ReactiveChatService reactiveChatService;
public AsyncController(AsyncChatService asyncChatService,
ReactiveChatService reactiveChatService) {
this.asyncChatService = asyncChatService;
this.reactiveChatService = reactiveChatService;
}
/**
* 异步聊天
*/
@PostMapping("/chat")
public CompletableFuture<ResponseEntity<String>> chatAsync(
@RequestBody ChatRequest request) {
return asyncChatService.chatAsync(request.message())
.thenApply(ResponseEntity::ok)
.exceptionally(throwable ->
ResponseEntity.badRequest().body("错误: " + throwable.getMessage()));
}
/**
* 批量异步聊天
*/
@PostMapping("/chat/batch")
public CompletableFuture<List<AsyncChatService.AsyncChatResult>> chatBatchAsync(
@RequestBody BatchChatRequest request) {
return asyncChatService.chatBatchAsync(request.messages());
}
/**
* 带超时的异步聊天
*/
@PostMapping("/chat/timeout")
public CompletableFuture<ResponseEntity<String>> chatAsyncWithTimeout(
@RequestBody ChatRequest request,
@RequestParam(defaultValue = "30000") long timeoutMs) {
return asyncChatService.chatAsyncWithTimeout(request.message(), timeoutMs)
.thenApply(ResponseEntity::ok)
.exceptionally(throwable ->
ResponseEntity.badRequest().body("超时或错误: " + throwable.getMessage()));
}
/**
* 异步聊天链
*/
@PostMapping("/chat/chain")
public CompletableFuture<String> chatChainAsync(
@RequestBody ChainChatRequest request) {
return asyncChatService.chatChainAsync(request.messages());
}
/**
* 异步聊天合并
*/
@PostMapping("/chat/merge")
public CompletableFuture<AsyncChatService.MergedChatResult> chatMergeAsync(
@RequestBody BatchChatRequest request) {
return asyncChatService.chatMergeAsync(request.messages());
}
/**
* 响应式聊天
*/
@PostMapping("/reactive/chat")
public Mono<String> chatReactive(@RequestBody ChatRequest request) {
return reactiveChatService.chatReactive(request.message());
}
/**
* 响应式批量聊天
*/
@PostMapping("/reactive/batch")
public Flux<ReactiveChatService.ReactiveChatResult> chatBatchReactive(
@RequestBody BatchChatRequest request) {
return reactiveChatService.chatBatchReactive(request.messages());
}
/**
* 响应式流式聊天
*/
@PostMapping("/reactive/stream")
public Flux<String> chatStreamReactive(@RequestBody ChatRequest request) {
return reactiveChatService.chatStreamReactive(request.message());
}
/**
* 带重试的响应式聊天
*/
@PostMapping("/reactive/retry")
public Mono<String> chatReactiveWithRetry(@RequestBody ChatRequest request) {
return reactiveChatService.chatReactiveWithRetry(request.message());
}
/**
* 带超时的响应式聊天
*/
@PostMapping("/reactive/timeout")
public Mono<String> chatReactiveWithTimeout(
@RequestBody ChatRequest request,
@RequestParam(defaultValue = "30") int timeoutSeconds) {
return reactiveChatService.chatReactiveWithTimeout(
request.message(), Duration.ofSeconds(timeoutSeconds));
}
/**
* 响应式聚合
*/
@PostMapping("/reactive/aggregate")
public Mono<ReactiveChatService.AggregatedResult> chatAggregateReactive(
@RequestBody BatchChatRequest request) {
return reactiveChatService.chatAggregateReactive(request.messages());
}
// 请求记录类
public record ChatRequest(String message) {}
public record BatchChatRequest(List<String> messages) {}
public record ChainChatRequest(List<String> messages) {}
}
7.6 配置文件
7.6.1 应用配置
# application.yml
spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
base-url: https://api.openai.com
chat:
options:
model: gpt-3.5-turbo
temperature: 0.7
max-tokens: 1000
stream: true
# WebSocket配置
websocket:
allowed-origins: "*"
# 异步配置
task:
execution:
pool:
core-size: 8
max-size: 20
queue-capacity: 100
keep-alive: 60s
thread-name-prefix: "async-chat-"
# Reactor配置
reactor:
netty:
pool:
max-connections: 100
max-idle-time: 30s
# 流式处理配置
streaming:
chat:
timeout: 30s
buffer-size: 1000
backpressure-strategy: buffer
retry:
max-attempts: 3
delay: 1s
max-delay: 10s
websocket:
max-sessions: 1000
session-timeout: 300s
monitoring:
enabled: true
metrics-interval: 5s
# 异步处理配置
async:
chat:
thread-pool-size: 10
timeout: 60s
retry:
enabled: true
max-attempts: 3
delay: 2s
batch:
max-size: 50
timeout: 120s
parallel-degree: 5
# 日志配置
logging:
level:
com.example.springai.service: DEBUG
reactor.netty: INFO
org.springframework.web.socket: DEBUG
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"
7.6.2 流式处理配置类
// StreamingProperties.java
package com.example.springai.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.time.Duration;
@Component
@ConfigurationProperties(prefix = "streaming")
public class StreamingProperties {
private Chat chat = new Chat();
private WebSocket websocket = new WebSocket();
private Monitoring monitoring = new Monitoring();
// Getters and Setters
public Chat getChat() { return chat; }
public void setChat(Chat chat) { this.chat = chat; }
public WebSocket getWebsocket() { return websocket; }
public void setWebsocket(WebSocket websocket) { this.websocket = websocket; }
public Monitoring getMonitoring() { return monitoring; }
public void setMonitoring(Monitoring monitoring) { this.monitoring = monitoring; }
public static class Chat {
private Duration timeout = Duration.ofSeconds(30);
private int bufferSize = 1000;
private String backpressureStrategy = "buffer";
private Retry retry = new Retry();
// Getters and Setters
public Duration getTimeout() { return timeout; }
public void setTimeout(Duration timeout) { this.timeout = timeout; }
public int getBufferSize() { return bufferSize; }
public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; }
public String getBackpressureStrategy() { return backpressureStrategy; }
public void setBackpressureStrategy(String backpressureStrategy) {
this.backpressureStrategy = backpressureStrategy;
}
public Retry getRetry() { return retry; }
public void setRetry(Retry retry) { this.retry = retry; }
}
public static class WebSocket {
private int maxSessions = 1000;
private Duration sessionTimeout = Duration.ofMinutes(5);
// Getters and Setters
public int getMaxSessions() { return maxSessions; }
public void setMaxSessions(int maxSessions) { this.maxSessions = maxSessions; }
public Duration getSessionTimeout() { return sessionTimeout; }
public void setSessionTimeout(Duration sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
}
public static class Monitoring {
private boolean enabled = true;
private Duration metricsInterval = Duration.ofSeconds(5);
// Getters and Setters
public boolean isEnabled() { return enabled; }
public void setEnabled(boolean enabled) { this.enabled = enabled; }
public Duration getMetricsInterval() { return metricsInterval; }
public void setMetricsInterval(Duration metricsInterval) {
this.metricsInterval = metricsInterval;
}
}
public static class Retry {
private int maxAttempts = 3;
private Duration delay = Duration.ofSeconds(1);
private Duration maxDelay = Duration.ofSeconds(10);
// Getters and Setters
public int getMaxAttempts() { return maxAttempts; }
public void setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; }
public Duration getDelay() { return delay; }
public void setDelay(Duration delay) { this.delay = delay; }
public Duration getMaxDelay() { return maxDelay; }
public void setMaxDelay(Duration maxDelay) { this.maxDelay = maxDelay; }
}
}
7.7 本章总结
7.7.1 核心要点
流式处理优势
- 实时反馈,改善用户体验
- 降低首字节时间(TTFB)
- 减少内存占用和网络缓冲
- 支持长文本生成场景
Spring AI流式架构
- StreamingChatModel接口
- Flux响应式流处理
- 背压处理和错误恢复
- 流式响应累积和监控
异步编程模式
- CompletableFuture异步处理
- 响应式编程(Reactor)
- 并行处理和批量操作
- 超时和重试机制
WebSocket集成
- 实时双向通信
- 会话管理和资源清理
- 错误处理和连接监控
- 消息序列化和反序列化
7.7.2 最佳实践
流式处理优化
- 合理设置缓冲区大小
- 实现背压处理策略
- 监控流式处理性能
- 处理网络中断和重连
异步编程规范
- 避免阻塞操作
- 合理配置线程池
- 实现超时和取消机制
- 处理异常和错误恢复
资源管理
- 及时释放订阅资源
- 限制并发连接数
- 监控内存使用情况
- 实现优雅关闭机制
7.7.3 练习题
基础练习
- 实现一个简单的流式聊天服务
- 创建WebSocket聊天室功能
- 添加流式响应监控和统计
进阶练习
- 实现多模型并行流式处理
- 创建自适应背压处理策略
- 开发流式响应缓存机制
高级练习
- 构建分布式流式处理系统
- 实现流式响应的A/B测试
- 开发智能流式响应优化算法
实战项目
- 创建实时AI助手应用
- 构建多人协作AI写作平台
- 开发AI驱动的实时翻译系统 “`