WebSocket(WebSocket音频数据传输实践)

WebSocket(WebSocket音频数据传输实践)
WebSocket音频数据传输实践

WebSocket支持二进制数据传输,因此用它来传输音视频数据是可行的。

那么什么场景下会用呢?

比如常见的音视频通话,不管你是跟真人对话还是跟AI对话,总要涉及语音交互,对于这种场景,现在也有WebRTC方案或WebRTC与WebSocket结合的方案。

今天我们来看一下直接使用WebSocket进行音频数据传输的实现流程。

WebSocket(WebSocket音频数据传输实践)

框架及基本使用

WebSocket在Spring框架中的集成越来越简化了,像下面这样

@Configurationpublic class WebSocketConfig {    @Bean    public ServerEndpointExporter serverEndpointExporter() {        return new ServerEndpointExporter();    }}

然后定义endpoint就可以使用

@Component@ServerEndpoint(value = "/websocket/realtimeCall/{dialogId}")public class RealtimeCallServer {    private static final ConcurrentHashMap sessionPools = new ConcurrentHashMap<>();    /**     * 连接建立成功调用的方法     */    @OnOpen    public void onOpen(Session session, @PathParam(value = "dialogId") String dialogId) throws Exception {        if (sessionPools.get(dialogId) != null) {            sessionPools.get(dialogId).close();            sessionPools.remove(dialogId);        }        sessionPools.put(dialogId, session);    }    /**     * 连接关闭时处理     */    @OnClose    public void onClose(@PathParam(value = "dialogId") String dialogId) throws IOException {        sessionPools.get(dialogId).close();        sessionPools.remove(dialogId);    }    /**     * 抛出异常时处理     */    @OnError    public void onError(Session session, Throwable exception) {        logger.error("realtimeCall--WebSocketServer连接异常", exception);    }    /**     * 服务器接收到客户端消息时调用的方法     */    @OnMessage(maxMessageSize = 52428800)    public void onMessage(@PathParam(value = "dialogId") String dialogId, byte[] buffer) {        // TODO 业务处理        // eg. 保存语音文件       String voiceFilePath = basePath + "/" + dialogId + ".mp3";       FileUtil.writeBytes(convertPcmToMp3(voiceData, 16000, 1, 16), voiceFilePath);    }    private byte[] convertPcmToMp3(byte[] pcmData, int sampleRate, int channels, int bitsPerSample) throws Exception {        // Create temporary PCM file        Path tempPcmPath = Files.createTempFile("temp_pcm", ".raw");        try (FileOutputStream fos = new FileOutputStream(tempPcmPath.toFile())) {            fos.write(pcmData);        }        // Create temporary MP3 file        Path tempMp3Path = Files.createTempFile("temp_mp3", ".mp3");        // Construct command line arguments        String[] cmd = {                "ffmpeg",                "-y", // Overwrite output files without asking                "-f", "s" + bitsPerSample + "le", // Sample format (little-endian)                "-ar", String.valueOf(sampleRate), // Sample rate                "-ac", String.valueOf(channels), // Number of audio channels                "-i", tempPcmPath.toString(),                "-b:a", "128k", // Bitrate                tempMp3Path.toString()        };        // Execute FFmpeg command        ProcessBuilder processBuilder = new ProcessBuilder(cmd);        Process process = processBuilder.start();        // Wait for the process to complete        int exitCode = process.waitFor();        if (exitCode != 0) {            throw new RuntimeException("FFmpeg conversion failed.");        }        // Read the MP3 data into a byte array        byte[] mp3Data = Files.readAllBytes(tempMp3Path);        // Delete temporary files        Files.deleteIfExists(tempPcmPath);        Files.deleteIfExists(tempMp3Path);        return mp3Data;    }}

这里我们将收到的语音保存成了mp3文件。注意设置maxMessageSize参数,防止超过传输默认长度限制。

例子中直接传输了字节流,但实际上我们可能需要附加一些信息(不方便加到path上的情况),也就是需要传输结构化数据,将音频流作为字段放到其中。

传输结构化数据

这里有一些关键点,首先WebSocket对结构化数据的传输需要自定义编解码器,因为本质上还是将其当做字符串来传输的,那么就带来另一个问题,json字符串中不好嵌入流数据,所以我们要将字节流进行base64编码,然后在接收方手动解码。这一点没找到特别好的解决办法,传输大数据量时可能出现问题。

那么代码就变成下面这样

@Component@ServerEndpoint(value = "/websocket/realtimeCall/{dialogId}", decoders = {RealtimeCallParamDecoder.class})public class RealtimeCallServer {    private static final ConcurrentHashMap sessionPools = new ConcurrentHashMap<>();    /**     * 连接建立成功调用的方法     */    @OnOpen    public void onOpen(Session session, @PathParam(value = "dialogId") String dialogId) throws Exception {        if (sessionPools.get(dialogId) != null) {            sessionPools.get(dialogId).close();            sessionPools.remove(dialogId);        }        sessionPools.put(dialogId, session);    }    /**     * 连接关闭时处理     */    @OnClose    public void onClose(@PathParam(value = "dialogId") String dialogId) throws IOException {        sessionPools.get(dialogId).close();        sessionPools.remove(dialogId);    }    /**     * 抛出异常时处理     */    @OnError    public void onError(Session session, Throwable exception) {        logger.error("realtimeCall--WebSocketServer连接异常", exception);    }    /**     * 服务器接收到客户端消息时调用的方法     */    @OnMessage(maxMessageSize = 52428800)    public void onMessage(@PathParam(value = "dialogId") String dialogId, RealtimeCallParam param) {        byte[] voiceStream = Base64.getDecoder().decode(param.getVoiceBuffer());        // TODO 后续业务处理    }}

解码器

public class RealtimeCallParamDecoder implements jakarta.websocket.Decoder.Text {    private static final Logger logger = LoggerFactory.getLogger(RealtimeCallParamDecoder.class);    @Override    public void destroy() {    }    @Override    public void init(EndpointConfig arg0) {    }    @SneakyThrows    @Override    public RealtimeCallParam decode(String param) {        if (StringUtils.isEmpty(param)) {            return null;        }        return new ObjectMapper().readValue(param, RealtimeCallParam.class);    }    @Override    public boolean willDecode(String arg0) {        return true;    }}

结构化参数

@Data@Builder@AllArgsConstructor@NoArgsConstructorpublic class RealtimeCallParam {    private Long userId;    private String chatId;    /**     * 音频流base64     */    private String voiceBuffer;}

这样我们就完成了一个基于WebSocket的音频数据传输,当然说成流传输更合适,音频只是赋予它的使用场景。

文章版权声明:除非注明,否则均为边学边练网络文章,版权归原作者所有