7.1 流式处理基础

7.1.1 流式处理概念

流式处理(Streaming)是现代AI应用的重要特性,它允许AI模型逐步生成响应内容,而不是等待完整响应后一次性返回。这种方式可以显著改善用户体验,特别是在生成长文本时。

核心优势: 1. 实时反馈:用户可以立即看到AI开始生成内容 2. 降低延迟:减少首字节时间(TTFB) 3. 改善体验:类似打字机效果,更自然的交互 4. 资源优化:减少内存占用和网络缓冲

7.1.2 Spring AI流式架构

// StreamingChatModel接口
public interface StreamingChatModel extends ChatModel {
    /**
     * 流式聊天调用
     */
    Flux<ChatResponse> stream(Prompt prompt);
    
    /**
     * 流式聊天调用(简化版)
     */
    default Flux<String> streamContent(String message) {
        return stream(new Prompt(message))
            .map(response -> response.getResult().getOutput().getContent());
    }
}

// 流式响应处理器
package com.example.springai.streaming;

import org.springframework.ai.chat.model.ChatResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.util.function.Consumer;

public class StreamingResponseHandler {
    
    private final Sinks.Many<String> contentSink;
    private final Sinks.Many<ChatResponse> responseSink;
    
    public StreamingResponseHandler() {
        this.contentSink = Sinks.many().multicast().onBackpressureBuffer();
        this.responseSink = Sinks.many().multicast().onBackpressureBuffer();
    }
    
    /**
     * 处理流式响应
     */
    public void handleStreamingResponse(Flux<ChatResponse> responseStream) {
        responseStream
            .doOnNext(response -> {
                // 发送完整响应
                responseSink.tryEmitNext(response);
                
                // 提取并发送内容
                String content = response.getResult().getOutput().getContent();
                if (content != null && !content.isEmpty()) {
                    contentSink.tryEmitNext(content);
                }
            })
            .doOnError(error -> {
                contentSink.tryEmitError(error);
                responseSink.tryEmitError(error);
            })
            .doOnComplete(() -> {
                contentSink.tryEmitComplete();
                responseSink.tryEmitComplete();
            })
            .subscribe();
    }
    
    /**
     * 获取内容流
     */
    public Flux<String> getContentStream() {
        return contentSink.asFlux();
    }
    
    /**
     * 获取响应流
     */
    public Flux<ChatResponse> getResponseStream() {
        return responseSink.asFlux();
    }
    
    /**
     * 添加内容监听器
     */
    public void onContent(Consumer<String> contentConsumer) {
        getContentStream().subscribe(contentConsumer);
    }
    
    /**
     * 添加完成监听器
     */
    public void onComplete(Runnable completeCallback) {
        getContentStream()
            .doOnComplete(completeCallback)
            .subscribe();
    }
    
    /**
     * 添加错误监听器
     */
    public void onError(Consumer<Throwable> errorConsumer) {
        getContentStream()
            .doOnError(errorConsumer)
            .subscribe();
    }
}

7.2 流式聊天服务

7.2.1 基础流式服务

// StreamingChatService.java
package com.example.springai.service;

import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.chat.prompt.PromptTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

@Service
public class StreamingChatService {
    
    private final StreamingChatModel streamingChatModel;
    
    public StreamingChatService(StreamingChatModel streamingChatModel) {
        this.streamingChatModel = streamingChatModel;
    }
    
    /**
     * 基础流式聊天
     */
    public Flux<String> streamChat(String message) {
        return streamingChatModel.streamContent(message)
            .filter(content -> content != null && !content.trim().isEmpty());
    }
    
    /**
     * 带模板的流式聊天
     */
    public Flux<String> streamChatWithTemplate(String template, Map<String, Object> variables) {
        PromptTemplate promptTemplate = new PromptTemplate(template);
        Prompt prompt = promptTemplate.create(variables);
        
        return streamingChatModel.stream(prompt)
            .map(response -> response.getResult().getOutput().getContent())
            .filter(content -> content != null && !content.trim().isEmpty());
    }
    
    /**
     * 累积内容的流式聊天
     */
    public Flux<AccumulatedResponse> streamChatWithAccumulation(String message) {
        AtomicReference<StringBuilder> accumulator = new AtomicReference<>(new StringBuilder());
        
        return streamChat(message)
            .map(chunk -> {
                StringBuilder sb = accumulator.get();
                sb.append(chunk);
                return new AccumulatedResponse(chunk, sb.toString());
            });
    }
    
    /**
     * 带超时的流式聊天
     */
    public Flux<String> streamChatWithTimeout(String message, Duration timeout) {
        return streamChat(message)
            .timeout(timeout)
            .onErrorResume(throwable -> 
                Flux.just("[超时] 响应时间过长,请稍后重试"));
    }
    
    /**
     * 流式聊天完成后的处理
     */
    public Mono<String> streamChatAndCollect(String message) {
        return streamChat(message)
            .collectList()
            .map(chunks -> String.join("", chunks));
    }
    
    /**
     * 多轮对话流式处理
     */
    public Flux<ConversationResponse> streamConversation(ConversationRequest request) {
        StringBuilder conversationHistory = new StringBuilder();
        
        // 构建对话历史
        request.history().forEach(turn -> {
            conversationHistory.append("用户: ").append(turn.userMessage()).append("\n");
            conversationHistory.append("助手: ").append(turn.assistantMessage()).append("\n");
        });
        
        // 添加当前用户消息
        conversationHistory.append("用户: ").append(request.currentMessage()).append("\n");
        conversationHistory.append("助手: ");
        
        return streamChat(conversationHistory.toString())
            .map(chunk -> new ConversationResponse(
                request.conversationId(),
                chunk,
                System.currentTimeMillis()
            ));
    }
    
    /**
     * 累积响应记录
     */
    public record AccumulatedResponse(
        String chunk,
        String accumulated
    ) {}
    
    /**
     * 对话请求
     */
    public record ConversationRequest(
        String conversationId,
        String currentMessage,
        java.util.List<ConversationTurn> history
    ) {}
    
    /**
     * 对话轮次
     */
    public record ConversationTurn(
        String userMessage,
        String assistantMessage
    ) {}
    
    /**
     * 对话响应
     */
    public record ConversationResponse(
        String conversationId,
        String chunk,
        long timestamp
    ) {}
}

7.2.2 高级流式处理

// AdvancedStreamingService.java
package com.example.springai.service;

import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

@Service
public class AdvancedStreamingService {
    
    private final StreamingChatModel streamingChatModel;
    
    public AdvancedStreamingService(StreamingChatModel streamingChatModel) {
        this.streamingChatModel = streamingChatModel;
    }
    
    /**
     * 带速率限制的流式处理
     */
    public Flux<String> streamWithRateLimit(String message, Duration interval) {
        return streamingChatModel.streamContent(message)
            .delayElements(interval)
            .filter(content -> content != null && !content.trim().isEmpty());
    }
    
    /**
     * 分批处理的流式响应
     */
    public Flux<BatchResponse> streamInBatches(String message, int batchSize) {
        AtomicInteger counter = new AtomicInteger(0);
        
        return streamingChatModel.streamContent(message)
            .filter(content -> content != null && !content.trim().isEmpty())
            .buffer(batchSize)
            .map(batch -> new BatchResponse(
                counter.incrementAndGet(),
                batch,
                String.join("", batch)
            ));
    }
    
    /**
     * 带重试机制的流式处理
     */
    public Flux<String> streamWithRetry(String message, int maxRetries) {
        return streamingChatModel.streamContent(message)
            .retry(maxRetries)
            .onErrorResume(throwable -> 
                Flux.just("[错误] 处理失败: " + throwable.getMessage()));
    }
    
    /**
     * 并行流式处理
     */
    public Flux<ParallelResponse> streamParallel(java.util.List<String> messages) {
        return Flux.fromIterable(messages)
            .parallel()
            .runOn(Schedulers.parallel())
            .flatMap(message -> 
                streamingChatModel.streamContent(message)
                    .collectList()
                    .map(chunks -> new ParallelResponse(
                        message,
                        String.join("", chunks),
                        Thread.currentThread().getName()
                    ))
            )
            .sequential();
    }
    
    /**
     * 流式处理监控
     */
    public Flux<MonitoredResponse> streamWithMonitoring(String message) {
        AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
        AtomicInteger chunkCount = new AtomicInteger(0);
        AtomicInteger totalLength = new AtomicInteger(0);
        
        return streamingChatModel.streamContent(message)
            .filter(content -> content != null && !content.trim().isEmpty())
            .map(chunk -> {
                int count = chunkCount.incrementAndGet();
                int length = totalLength.addAndGet(chunk.length());
                long elapsed = System.currentTimeMillis() - startTime.get();
                
                return new MonitoredResponse(
                    chunk,
                    count,
                    length,
                    elapsed,
                    calculateThroughput(length, elapsed)
                );
            });
    }
    
    /**
     * 条件流式处理
     */
    public Flux<String> streamWithCondition(String message, 
                                           Function<String, Boolean> condition) {
        return streamingChatModel.streamContent(message)
            .filter(content -> content != null && !content.trim().isEmpty())
            .takeWhile(condition::apply);
    }
    
    /**
     * 流式处理转换
     */
    public Flux<TransformedResponse> streamWithTransformation(String message,
                                                             Function<String, String> transformer) {
        return streamingChatModel.streamContent(message)
            .filter(content -> content != null && !content.trim().isEmpty())
            .map(chunk -> new TransformedResponse(
                chunk,
                transformer.apply(chunk)
            ));
    }
    
    /**
     * 计算吞吐量
     */
    private double calculateThroughput(int totalLength, long elapsedMs) {
        if (elapsedMs == 0) return 0.0;
        return (double) totalLength / elapsedMs * 1000; // 字符/秒
    }
    
    // 响应记录类
    public record BatchResponse(
        int batchNumber,
        java.util.List<String> chunks,
        String content
    ) {}
    
    public record ParallelResponse(
        String originalMessage,
        String response,
        String threadName
    ) {}
    
    public record MonitoredResponse(
        String chunk,
        int chunkCount,
        int totalLength,
        long elapsedMs,
        double throughput
    ) {}
    
    public record TransformedResponse(
        String original,
        String transformed
    ) {}
}

7.3 异步编程模式

7.3.1 异步聊天服务

// AsyncChatService.java
package com.example.springai.service;

import org.springframework.ai.chat.model.ChatModel;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

@Service
public class AsyncChatService {
    
    private final ChatModel chatModel;
    private final Executor asyncExecutor;
    
    public AsyncChatService(ChatModel chatModel) {
        this.chatModel = chatModel;
        this.asyncExecutor = Executors.newFixedThreadPool(10);
    }
    
    /**
     * 异步聊天
     */
    @Async
    public CompletableFuture<String> chatAsync(String message) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                ChatResponse response = chatModel.call(new Prompt(message));
                return response.getResult().getOutput().getContent();
            } catch (Exception e) {
                throw new RuntimeException("异步聊天失败: " + e.getMessage(), e);
            }
        }, asyncExecutor);
    }
    
    /**
     * 批量异步聊天
     */
    public CompletableFuture<List<AsyncChatResult>> chatBatchAsync(List<String> messages) {
        List<CompletableFuture<AsyncChatResult>> futures = messages.stream()
            .map(message -> 
                chatAsync(message)
                    .thenApply(response -> new AsyncChatResult(message, response, true, null))
                    .exceptionally(throwable -> new AsyncChatResult(
                        message, null, false, throwable.getMessage()))
            )
            .collect(Collectors.toList());
        
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
    }
    
    /**
     * 带超时的异步聊天
     */
    public CompletableFuture<String> chatAsyncWithTimeout(String message, long timeoutMs) {
        CompletableFuture<String> chatFuture = chatAsync(message);
        
        CompletableFuture<String> timeoutFuture = new CompletableFuture<>();
        
        // 设置超时
        CompletableFuture.delayedExecutor(timeoutMs, java.util.concurrent.TimeUnit.MILLISECONDS)
            .execute(() -> timeoutFuture.completeExceptionally(
                new java.util.concurrent.TimeoutException("聊天请求超时")));
        
        return chatFuture.applyToEither(timeoutFuture, Function.identity());
    }
    
    /**
     * 异步聊天链
     */
    public CompletableFuture<String> chatChainAsync(List<String> messages) {
        CompletableFuture<String> result = CompletableFuture.completedFuture("");
        
        for (String message : messages) {
            result = result.thenCompose(previousResponse -> {
                String combinedMessage = previousResponse.isEmpty() ? 
                    message : previousResponse + "\n\n" + message;
                return chatAsync(combinedMessage);
            });
        }
        
        return result;
    }
    
    /**
     * 条件异步聊天
     */
    public CompletableFuture<String> chatAsyncWithCondition(
            String message, 
            java.util.function.Predicate<String> condition) {
        
        return chatAsync(message)
            .thenCompose(response -> {
                if (condition.test(response)) {
                    return CompletableFuture.completedFuture(response);
                } else {
                    // 如果条件不满足,重新生成
                    return chatAsync(message + "\n请重新生成更合适的回答。");
                }
            });
    }
    
    /**
     * 异步聊天结果合并
     */
    public CompletableFuture<MergedChatResult> chatMergeAsync(List<String> messages) {
        return chatBatchAsync(messages)
            .thenApply(results -> {
                List<String> successResponses = results.stream()
                    .filter(AsyncChatResult::success)
                    .map(AsyncChatResult::response)
                    .collect(Collectors.toList());
                
                List<String> errors = results.stream()
                    .filter(result -> !result.success())
                    .map(AsyncChatResult::error)
                    .collect(Collectors.toList());
                
                return new MergedChatResult(
                    String.join("\n\n", successResponses),
                    successResponses.size(),
                    errors.size(),
                    errors
                );
            });
    }
    
    // 结果记录类
    public record AsyncChatResult(
        String message,
        String response,
        boolean success,
        String error
    ) {}
    
    public record MergedChatResult(
        String mergedResponse,
        int successCount,
        int errorCount,
        List<String> errors
    ) {}
}

7.3.2 响应式编程集成

// ReactiveChatService.java
package com.example.springai.service;

import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.ai.chat.model.StreamingChatModel;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Service
public class ReactiveChatService {
    
    private final StreamingChatModel streamingChatModel;
    
    public ReactiveChatService(StreamingChatModel streamingChatModel) {
        this.streamingChatModel = streamingChatModel;
    }
    
    /**
     * 响应式聊天
     */
    public Mono<String> chatReactive(String message) {
        return Mono.fromCallable(() -> {
            ChatResponse response = streamingChatModel.call(new Prompt(message));
            return response.getResult().getOutput().getContent();
        })
        .subscribeOn(Schedulers.boundedElastic());
    }
    
    /**
     * 响应式批量处理
     */
    public Flux<ReactiveChatResult> chatBatchReactive(List<String> messages) {
        return Flux.fromIterable(messages)
            .parallel()
            .runOn(Schedulers.parallel())
            .flatMap(message -> 
                chatReactive(message)
                    .map(response -> new ReactiveChatResult(message, response, true, null))
                    .onErrorReturn(throwable -> new ReactiveChatResult(
                        message, null, false, throwable.getMessage()))
            )
            .sequential();
    }
    
    /**
     * 响应式流式聊天
     */
    public Flux<String> chatStreamReactive(String message) {
        return streamingChatModel.streamContent(message)
            .subscribeOn(Schedulers.boundedElastic())
            .publishOn(Schedulers.parallel())
            .filter(content -> content != null && !content.trim().isEmpty());
    }
    
    /**
     * 带背压处理的流式聊天
     */
    public Flux<String> chatStreamWithBackpressure(String message) {
        return streamingChatModel.streamContent(message)
            .onBackpressureBuffer(1000)
            .filter(content -> content != null && !content.trim().isEmpty());
    }
    
    /**
     * 响应式重试机制
     */
    public Mono<String> chatReactiveWithRetry(String message) {
        return chatReactive(message)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
                .maxBackoff(Duration.ofSeconds(10))
                .filter(throwable -> !(throwable instanceof IllegalArgumentException)));
    }
    
    /**
     * 响应式超时处理
     */
    public Mono<String> chatReactiveWithTimeout(String message, Duration timeout) {
        return chatReactive(message)
            .timeout(timeout)
            .onErrorReturn("请求超时,请稍后重试");
    }
    
    /**
     * 响应式缓存
     */
    public Mono<String> chatReactiveWithCache(String message) {
        return chatReactive(message)
            .cache(Duration.ofMinutes(5)); // 缓存5分钟
    }
    
    /**
     * 响应式聚合
     */
    public Mono<AggregatedResult> chatAggregateReactive(List<String> messages) {
        AtomicInteger counter = new AtomicInteger(0);
        
        return Flux.fromIterable(messages)
            .flatMap(this::chatReactive)
            .collectList()
            .map(responses -> new AggregatedResult(
                counter.incrementAndGet(),
                responses,
                String.join("\n\n", responses),
                responses.size()
            ));
    }
    
    /**
     * 响应式条件处理
     */
    public Flux<String> chatConditionalReactive(String message, 
                                               java.util.function.Predicate<String> condition) {
        return chatStreamReactive(message)
            .takeWhile(condition)
            .switchIfEmpty(Flux.just("没有满足条件的响应"));
    }
    
    /**
     * 响应式错误恢复
     */
    public Flux<String> chatStreamWithErrorRecovery(String message) {
        return streamingChatModel.streamContent(message)
            .onErrorResume(throwable -> {
                if (throwable instanceof java.util.concurrent.TimeoutException) {
                    return Flux.just("[超时] 响应时间过长");
                } else if (throwable instanceof IllegalArgumentException) {
                    return Flux.just("[参数错误] 请检查输入参数");
                } else {
                    return Flux.just("[系统错误] " + throwable.getMessage());
                }
            });
    }
    
    // 结果记录类
    public record ReactiveChatResult(
        String message,
        String response,
        boolean success,
        String error
    ) {}
    
    public record AggregatedResult(
        int id,
        List<String> responses,
        String combined,
        int count
    ) {}
}

7.4 WebSocket流式集成

7.4.1 WebSocket配置

// WebSocketConfig.java
package com.example.springai.config;

import com.example.springai.websocket.StreamingChatWebSocketHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    
    private final StreamingChatWebSocketHandler streamingChatHandler;
    
    public WebSocketConfig(StreamingChatWebSocketHandler streamingChatHandler) {
        this.streamingChatHandler = streamingChatHandler;
    }
    
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(streamingChatHandler, "/ws/streaming-chat")
                .setAllowedOrigins("*"); // 生产环境中应该限制域名
    }
}

7.4.2 WebSocket处理器

// StreamingChatWebSocketHandler.java
package com.example.springai.websocket;

import com.example.springai.service.StreamingChatService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import reactor.core.Disposable;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;

@Component
public class StreamingChatWebSocketHandler implements WebSocketHandler {
    
    private final StreamingChatService streamingChatService;
    private final ObjectMapper objectMapper;
    private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
    private final Map<String, Disposable> subscriptions = new ConcurrentHashMap<>();
    
    public StreamingChatWebSocketHandler(StreamingChatService streamingChatService,
                                       ObjectMapper objectMapper) {
        this.streamingChatService = streamingChatService;
        this.objectMapper = objectMapper;
    }
    
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        String sessionId = session.getId();
        sessions.put(sessionId, session);
        
        // 发送连接确认
        WebSocketMessage message = new WebSocketMessage(
            "connection", "已连接到流式聊天服务", sessionId, System.currentTimeMillis());
        sendMessage(session, message);
    }
    
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) 
            throws Exception {
        
        if (message instanceof TextMessage textMessage) {
            try {
                ChatRequest request = objectMapper.readValue(
                    textMessage.getPayload(), ChatRequest.class);
                
                handleChatRequest(session, request);
            } catch (Exception e) {
                sendError(session, "消息解析失败: " + e.getMessage());
            }
        }
    }
    
    private void handleChatRequest(WebSocketSession session, ChatRequest request) {
        String sessionId = session.getId();
        
        // 取消之前的订阅
        Disposable previousSubscription = subscriptions.get(sessionId);
        if (previousSubscription != null && !previousSubscription.isDisposed()) {
            previousSubscription.dispose();
        }
        
        // 开始流式聊天
        Disposable subscription = streamingChatService.streamChat(request.message())
            .subscribe(
                chunk -> {
                    WebSocketMessage response = new WebSocketMessage(
                        "chunk", chunk, sessionId, System.currentTimeMillis());
                    try {
                        sendMessage(session, response);
                    } catch (IOException e) {
                        // 处理发送错误
                    }
                },
                error -> {
                    sendError(session, "流式处理错误: " + error.getMessage());
                },
                () -> {
                    WebSocketMessage complete = new WebSocketMessage(
                        "complete", "流式响应完成", sessionId, System.currentTimeMillis());
                    try {
                        sendMessage(session, complete);
                    } catch (IOException e) {
                        // 处理发送错误
                    }
                }
            );
        
        subscriptions.put(sessionId, subscription);
    }
    
    private void sendMessage(WebSocketSession session, WebSocketMessage message) 
            throws IOException {
        if (session.isOpen()) {
            String json = objectMapper.writeValueAsString(message);
            session.sendMessage(new TextMessage(json));
        }
    }
    
    private void sendError(WebSocketSession session, String error) {
        try {
            WebSocketMessage errorMessage = new WebSocketMessage(
                "error", error, session.getId(), System.currentTimeMillis());
            sendMessage(session, errorMessage);
        } catch (IOException e) {
            // 记录日志
        }
    }
    
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) 
            throws Exception {
        sendError(session, "传输错误: " + exception.getMessage());
    }
    
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) 
            throws Exception {
        String sessionId = session.getId();
        
        // 清理资源
        sessions.remove(sessionId);
        
        Disposable subscription = subscriptions.remove(sessionId);
        if (subscription != null && !subscription.isDisposed()) {
            subscription.dispose();
        }
    }
    
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }
    
    // 消息记录类
    public record ChatRequest(String message) {}
    
    public record WebSocketMessage(
        String type,
        String content,
        String sessionId,
        long timestamp
    ) {}
}

7.5 流式控制器

7.5.1 REST流式接口

// StreamingController.java
package com.example.springai.controller;

import com.example.springai.service.StreamingChatService;
import com.example.springai.service.AdvancedStreamingService;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;

@RestController
@RequestMapping("/api/streaming")
public class StreamingController {
    
    private final StreamingChatService streamingChatService;
    private final AdvancedStreamingService advancedStreamingService;
    
    public StreamingController(StreamingChatService streamingChatService,
                             AdvancedStreamingService advancedStreamingService) {
        this.streamingChatService = streamingChatService;
        this.advancedStreamingService = advancedStreamingService;
    }
    
    /**
     * 基础流式聊天
     */
    @PostMapping(value = "/chat", produces = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> streamChat(@RequestBody ChatRequest request) {
        return streamingChatService.streamChat(request.message());
    }
    
    /**
     * 服务器发送事件(SSE)流式聊天
     */
    @PostMapping(value = "/chat/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamChatSSE(@RequestBody ChatRequest request) {
        return streamingChatService.streamChat(request.message())
            .map(chunk -> "data: " + chunk + "\n\n");
    }
    
    /**
     * 累积响应流式聊天
     */
    @PostMapping(value = "/chat/accumulated", produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<StreamingChatService.AccumulatedResponse> streamChatAccumulated(
            @RequestBody ChatRequest request) {
        return streamingChatService.streamChatWithAccumulation(request.message());
    }
    
    /**
     * 带超时的流式聊天
     */
    @PostMapping(value = "/chat/timeout", produces = MediaType.TEXT_PLAIN_VALUE)
    public Flux<String> streamChatWithTimeout(
            @RequestBody ChatRequest request,
            @RequestParam(defaultValue = "30") int timeoutSeconds) {
        return streamingChatService.streamChatWithTimeout(
            request.message(), Duration.ofSeconds(timeoutSeconds));
    }
    
    /**
     * 分批流式处理
     */
    @PostMapping(value = "/chat/batched", produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<AdvancedStreamingService.BatchResponse> streamChatBatched(
            @RequestBody ChatRequest request,
            @RequestParam(defaultValue = "5") int batchSize) {
        return advancedStreamingService.streamInBatches(request.message(), batchSize);
    }
    
    /**
     * 带监控的流式处理
     */
    @PostMapping(value = "/chat/monitored", produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<AdvancedStreamingService.MonitoredResponse> streamChatMonitored(
            @RequestBody ChatRequest request) {
        return advancedStreamingService.streamWithMonitoring(request.message());
    }
    
    /**
     * 多轮对话流式处理
     */
    @PostMapping(value = "/conversation", produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<StreamingChatService.ConversationResponse> streamConversation(
            @RequestBody StreamingChatService.ConversationRequest request) {
        return streamingChatService.streamConversation(request);
    }
    
    /**
     * 并行流式处理
     */
    @PostMapping(value = "/chat/parallel", produces = MediaType.APPLICATION_NDJSON_VALUE)
    public Flux<AdvancedStreamingService.ParallelResponse> streamChatParallel(
            @RequestBody ParallelChatRequest request) {
        return advancedStreamingService.streamParallel(request.messages());
    }
    
    /**
     * 流式聊天并收集结果
     */
    @PostMapping(value = "/chat/collect", produces = MediaType.APPLICATION_JSON_VALUE)
    public Mono<CollectedResponse> streamChatAndCollect(@RequestBody ChatRequest request) {
        return streamingChatService.streamChatAndCollect(request.message())
            .map(content -> new CollectedResponse(content, content.length()));
    }
    
    // 请求和响应记录类
    public record ChatRequest(String message) {}
    
    public record ParallelChatRequest(List<String> messages) {}
    
    public record CollectedResponse(String content, int length) {}
}

7.5.2 异步控制器

// AsyncController.java
package com.example.springai.controller;

import com.example.springai.service.AsyncChatService;
import com.example.springai.service.ReactiveChatService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@RestController
@RequestMapping("/api/async")
public class AsyncController {
    
    private final AsyncChatService asyncChatService;
    private final ReactiveChatService reactiveChatService;
    
    public AsyncController(AsyncChatService asyncChatService,
                         ReactiveChatService reactiveChatService) {
        this.asyncChatService = asyncChatService;
        this.reactiveChatService = reactiveChatService;
    }
    
    /**
     * 异步聊天
     */
    @PostMapping("/chat")
    public CompletableFuture<ResponseEntity<String>> chatAsync(
            @RequestBody ChatRequest request) {
        return asyncChatService.chatAsync(request.message())
            .thenApply(ResponseEntity::ok)
            .exceptionally(throwable -> 
                ResponseEntity.badRequest().body("错误: " + throwable.getMessage()));
    }
    
    /**
     * 批量异步聊天
     */
    @PostMapping("/chat/batch")
    public CompletableFuture<List<AsyncChatService.AsyncChatResult>> chatBatchAsync(
            @RequestBody BatchChatRequest request) {
        return asyncChatService.chatBatchAsync(request.messages());
    }
    
    /**
     * 带超时的异步聊天
     */
    @PostMapping("/chat/timeout")
    public CompletableFuture<ResponseEntity<String>> chatAsyncWithTimeout(
            @RequestBody ChatRequest request,
            @RequestParam(defaultValue = "30000") long timeoutMs) {
        return asyncChatService.chatAsyncWithTimeout(request.message(), timeoutMs)
            .thenApply(ResponseEntity::ok)
            .exceptionally(throwable -> 
                ResponseEntity.badRequest().body("超时或错误: " + throwable.getMessage()));
    }
    
    /**
     * 异步聊天链
     */
    @PostMapping("/chat/chain")
    public CompletableFuture<String> chatChainAsync(
            @RequestBody ChainChatRequest request) {
        return asyncChatService.chatChainAsync(request.messages());
    }
    
    /**
     * 异步聊天合并
     */
    @PostMapping("/chat/merge")
    public CompletableFuture<AsyncChatService.MergedChatResult> chatMergeAsync(
            @RequestBody BatchChatRequest request) {
        return asyncChatService.chatMergeAsync(request.messages());
    }
    
    /**
     * 响应式聊天
     */
    @PostMapping("/reactive/chat")
    public Mono<String> chatReactive(@RequestBody ChatRequest request) {
        return reactiveChatService.chatReactive(request.message());
    }
    
    /**
     * 响应式批量聊天
     */
    @PostMapping("/reactive/batch")
    public Flux<ReactiveChatService.ReactiveChatResult> chatBatchReactive(
            @RequestBody BatchChatRequest request) {
        return reactiveChatService.chatBatchReactive(request.messages());
    }
    
    /**
     * 响应式流式聊天
     */
    @PostMapping("/reactive/stream")
    public Flux<String> chatStreamReactive(@RequestBody ChatRequest request) {
        return reactiveChatService.chatStreamReactive(request.message());
    }
    
    /**
     * 带重试的响应式聊天
     */
    @PostMapping("/reactive/retry")
    public Mono<String> chatReactiveWithRetry(@RequestBody ChatRequest request) {
        return reactiveChatService.chatReactiveWithRetry(request.message());
    }
    
    /**
     * 带超时的响应式聊天
     */
    @PostMapping("/reactive/timeout")
    public Mono<String> chatReactiveWithTimeout(
            @RequestBody ChatRequest request,
            @RequestParam(defaultValue = "30") int timeoutSeconds) {
        return reactiveChatService.chatReactiveWithTimeout(
            request.message(), Duration.ofSeconds(timeoutSeconds));
    }
    
    /**
     * 响应式聚合
     */
    @PostMapping("/reactive/aggregate")
    public Mono<ReactiveChatService.AggregatedResult> chatAggregateReactive(
            @RequestBody BatchChatRequest request) {
        return reactiveChatService.chatAggregateReactive(request.messages());
    }
    
    // 请求记录类
    public record ChatRequest(String message) {}
    
    public record BatchChatRequest(List<String> messages) {}
    
    public record ChainChatRequest(List<String> messages) {}
}

7.6 配置文件

7.6.1 应用配置

# application.yml
spring:
  ai:
    openai:
      api-key: ${OPENAI_API_KEY}
      base-url: https://api.openai.com
      chat:
        options:
          model: gpt-3.5-turbo
          temperature: 0.7
          max-tokens: 1000
          stream: true
    
  # WebSocket配置
  websocket:
    allowed-origins: "*"
    
  # 异步配置
  task:
    execution:
      pool:
        core-size: 8
        max-size: 20
        queue-capacity: 100
        keep-alive: 60s
      thread-name-prefix: "async-chat-"
    
  # Reactor配置
  reactor:
    netty:
      pool:
        max-connections: 100
        max-idle-time: 30s

# 流式处理配置
streaming:
  chat:
    timeout: 30s
    buffer-size: 1000
    backpressure-strategy: buffer
    retry:
      max-attempts: 3
      delay: 1s
      max-delay: 10s
    
  websocket:
    max-sessions: 1000
    session-timeout: 300s
    
  monitoring:
    enabled: true
    metrics-interval: 5s

# 异步处理配置
async:
  chat:
    thread-pool-size: 10
    timeout: 60s
    retry:
      enabled: true
      max-attempts: 3
      delay: 2s
    
  batch:
    max-size: 50
    timeout: 120s
    parallel-degree: 5

# 日志配置
logging:
  level:
    com.example.springai.service: DEBUG
    reactor.netty: INFO
    org.springframework.web.socket: DEBUG
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"

7.6.2 流式处理配置类

// StreamingProperties.java
package com.example.springai.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.time.Duration;

@Component
@ConfigurationProperties(prefix = "streaming")
public class StreamingProperties {
    
    private Chat chat = new Chat();
    private WebSocket websocket = new WebSocket();
    private Monitoring monitoring = new Monitoring();
    
    // Getters and Setters
    public Chat getChat() { return chat; }
    public void setChat(Chat chat) { this.chat = chat; }
    
    public WebSocket getWebsocket() { return websocket; }
    public void setWebsocket(WebSocket websocket) { this.websocket = websocket; }
    
    public Monitoring getMonitoring() { return monitoring; }
    public void setMonitoring(Monitoring monitoring) { this.monitoring = monitoring; }
    
    public static class Chat {
        private Duration timeout = Duration.ofSeconds(30);
        private int bufferSize = 1000;
        private String backpressureStrategy = "buffer";
        private Retry retry = new Retry();
        
        // Getters and Setters
        public Duration getTimeout() { return timeout; }
        public void setTimeout(Duration timeout) { this.timeout = timeout; }
        
        public int getBufferSize() { return bufferSize; }
        public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; }
        
        public String getBackpressureStrategy() { return backpressureStrategy; }
        public void setBackpressureStrategy(String backpressureStrategy) { 
            this.backpressureStrategy = backpressureStrategy; 
        }
        
        public Retry getRetry() { return retry; }
        public void setRetry(Retry retry) { this.retry = retry; }
    }
    
    public static class WebSocket {
        private int maxSessions = 1000;
        private Duration sessionTimeout = Duration.ofMinutes(5);
        
        // Getters and Setters
        public int getMaxSessions() { return maxSessions; }
        public void setMaxSessions(int maxSessions) { this.maxSessions = maxSessions; }
        
        public Duration getSessionTimeout() { return sessionTimeout; }
        public void setSessionTimeout(Duration sessionTimeout) { 
            this.sessionTimeout = sessionTimeout; 
        }
    }
    
    public static class Monitoring {
        private boolean enabled = true;
        private Duration metricsInterval = Duration.ofSeconds(5);
        
        // Getters and Setters
        public boolean isEnabled() { return enabled; }
        public void setEnabled(boolean enabled) { this.enabled = enabled; }
        
        public Duration getMetricsInterval() { return metricsInterval; }
        public void setMetricsInterval(Duration metricsInterval) { 
            this.metricsInterval = metricsInterval; 
        }
    }
    
    public static class Retry {
        private int maxAttempts = 3;
        private Duration delay = Duration.ofSeconds(1);
        private Duration maxDelay = Duration.ofSeconds(10);
        
        // Getters and Setters
        public int getMaxAttempts() { return maxAttempts; }
        public void setMaxAttempts(int maxAttempts) { this.maxAttempts = maxAttempts; }
        
        public Duration getDelay() { return delay; }
        public void setDelay(Duration delay) { this.delay = delay; }
        
        public Duration getMaxDelay() { return maxDelay; }
        public void setMaxDelay(Duration maxDelay) { this.maxDelay = maxDelay; }
    }
}

7.7 本章总结

7.7.1 核心要点

  1. 流式处理优势

    • 实时反馈,改善用户体验
    • 降低首字节时间(TTFB)
    • 减少内存占用和网络缓冲
    • 支持长文本生成场景
  2. Spring AI流式架构

    • StreamingChatModel接口
    • Flux响应式流处理
    • 背压处理和错误恢复
    • 流式响应累积和监控
  3. 异步编程模式

    • CompletableFuture异步处理
    • 响应式编程(Reactor)
    • 并行处理和批量操作
    • 超时和重试机制
  4. WebSocket集成

    • 实时双向通信
    • 会话管理和资源清理
    • 错误处理和连接监控
    • 消息序列化和反序列化

7.7.2 最佳实践

  1. 流式处理优化

    • 合理设置缓冲区大小
    • 实现背压处理策略
    • 监控流式处理性能
    • 处理网络中断和重连
  2. 异步编程规范

    • 避免阻塞操作
    • 合理配置线程池
    • 实现超时和取消机制
    • 处理异常和错误恢复
  3. 资源管理

    • 及时释放订阅资源
    • 限制并发连接数
    • 监控内存使用情况
    • 实现优雅关闭机制

7.7.3 练习题

  1. 基础练习

    • 实现一个简单的流式聊天服务
    • 创建WebSocket聊天室功能
    • 添加流式响应监控和统计
  2. 进阶练习

    • 实现多模型并行流式处理
    • 创建自适应背压处理策略
    • 开发流式响应缓存机制
  3. 高级练习

    • 构建分布式流式处理系统
    • 实现流式响应的A/B测试
    • 开发智能流式响应优化算法
  4. 实战项目

    • 创建实时AI助手应用
    • 构建多人协作AI写作平台
    • 开发AI驱动的实时翻译系统 “`