3.1 生产者基础

3.1.1 生产者概述

from dataclasses import dataclass
from typing import List, Dict, Optional, Callable
from enum import Enum
import time
import threading
import json
import hashlib
from abc import ABC, abstractmethod

class ProducerState(Enum):
    """生产者状态"""
    CREATE_JUST = "CREATE_JUST"
    RUNNING = "RUNNING"
    SHUTDOWN_ALREADY = "SHUTDOWN_ALREADY"
    SERVICE_NOT_OK = "SERVICE_NOT_OK"

class SendStatus(Enum):
    """发送状态"""
    SEND_OK = "SEND_OK"
    FLUSH_DISK_TIMEOUT = "FLUSH_DISK_TIMEOUT"
    FLUSH_SLAVE_TIMEOUT = "FLUSH_SLAVE_TIMEOUT"
    SLAVE_NOT_AVAILABLE = "SLAVE_NOT_AVAILABLE"

class CommunicationMode(Enum):
    """通信模式"""
    SYNC = "SYNC"
    ASYNC = "ASYNC"
    ONEWAY = "ONEWAY"

@dataclass
class SendResult:
    """发送结果"""
    send_status: SendStatus
    msg_id: str
    message_queue: 'MessageQueue'
    queue_offset: int
    transaction_id: Optional[str] = None
    offset_msg_id: Optional[str] = None
    region_id: Optional[str] = None
    trace_on: bool = True

@dataclass
class MessageQueue:
    """消息队列"""
    topic: str
    broker_name: str
    queue_id: int
    
    def __hash__(self):
        return hash(f"{self.topic}#{self.broker_name}#{self.queue_id}")
    
    def __eq__(self, other):
        if not isinstance(other, MessageQueue):
            return False
        return (self.topic == other.topic and 
                self.broker_name == other.broker_name and 
                self.queue_id == other.queue_id)

@dataclass
class Message:
    """消息"""
    topic: str
    body: bytes
    tags: Optional[str] = None
    keys: Optional[str] = None
    flag: int = 0
    properties: Optional[Dict[str, str]] = None
    transaction_id: Optional[str] = None
    
    def __post_init__(self):
        if self.properties is None:
            self.properties = {}
    
    def put_property(self, key: str, value: str) -> None:
        """设置属性"""
        self.properties[key] = value
    
    def get_property(self, key: str) -> Optional[str]:
        """获取属性"""
        return self.properties.get(key)
    
    def clear_property(self, key: str) -> None:
        """清除属性"""
        if key in self.properties:
            del self.properties[key]
    
    def set_delay_time_level(self, level: int) -> None:
        """设置延时等级"""
        self.put_property("DELAY", str(level))
    
    def get_delay_time_level(self) -> int:
        """获取延时等级"""
        delay_str = self.get_property("DELAY")
        return int(delay_str) if delay_str else 0
    
    def set_wait_store_msg_ok(self, wait: bool) -> None:
        """设置是否等待存储完成"""
        self.put_property("WAIT", str(wait).lower())
    
    def is_wait_store_msg_ok(self) -> bool:
        """是否等待存储完成"""
        wait_str = self.get_property("WAIT")
        return wait_str == "true" if wait_str else True

class SendCallback(ABC):
    """发送回调接口"""
    
    @abstractmethod
    def on_success(self, send_result: SendResult) -> None:
        """发送成功回调"""
        pass
    
    @abstractmethod
    def on_exception(self, exception: Exception) -> None:
        """发送异常回调"""
        pass

class MessageQueueSelector(ABC):
    """消息队列选择器接口"""
    
    @abstractmethod
    def select(self, mqs: List[MessageQueue], msg: Message, arg: object) -> MessageQueue:
        """选择消息队列"""
        pass

class SelectMessageQueueByHash(MessageQueueSelector):
    """基于哈希的队列选择器"""
    
    def select(self, mqs: List[MessageQueue], msg: Message, arg: object) -> MessageQueue:
        """根据哈希值选择队列"""
        if not mqs:
            raise ValueError("消息队列列表不能为空")
        
        hash_value = hash(str(arg)) if arg else hash(msg.keys or "")
        index = abs(hash_value) % len(mqs)
        return mqs[index]

class SelectMessageQueueByRandom(MessageQueueSelector):
    """随机队列选择器"""
    
    def __init__(self):
        import random
        self.random = random
    
    def select(self, mqs: List[MessageQueue], msg: Message, arg: object) -> MessageQueue:
        """随机选择队列"""
        if not mqs:
            raise ValueError("消息队列列表不能为空")
        
        index = self.random.randint(0, len(mqs) - 1)
        return mqs[index]

class SelectMessageQueueByMachineRoom(MessageQueueSelector):
    """按机房选择队列"""
    
    def __init__(self, consumer_idc: str):
        self.consumer_idc = consumer_idc
    
    def select(self, mqs: List[MessageQueue], msg: Message, arg: object) -> MessageQueue:
        """优先选择同机房的队列"""
        if not mqs:
            raise ValueError("消息队列列表不能为空")
        
        # 优先选择同机房的队列
        same_idc_queues = [mq for mq in mqs if self.consumer_idc in mq.broker_name]
        
        if same_idc_queues:
            # 如果有同机房的队列,随机选择一个
            import random
            return random.choice(same_idc_queues)
        else:
            # 如果没有同机房的队列,随机选择一个
            import random
            return random.choice(mqs)

@dataclass
class ProducerConfig:
    """生产者配置"""
    producer_group: str
    nameserver_addr: str
    instance_name: str = "DEFAULT"
    client_ip: str = "127.0.0.1"
    send_msg_timeout: int = 3000  # 发送超时时间(ms)
    compress_msg_body_over_howmuch: int = 4096  # 消息体压缩阈值
    retry_times_when_send_failed: int = 2  # 同步发送失败重试次数
    retry_times_when_send_async_failed: int = 2  # 异步发送失败重试次数
    retry_another_broker_when_not_store_ok: bool = False  # 存储失败时是否重试其他Broker
    max_message_size: int = 1024 * 1024 * 4  # 最大消息大小(4MB)
    default_topic_queue_nums: int = 4  # 默认Topic队列数
    send_latency_fault_enable: bool = False  # 是否启用延迟故障容错
    
    def validate(self) -> None:
        """验证配置"""
        if not self.producer_group:
            raise ValueError("生产者组不能为空")
        
        if not self.nameserver_addr:
            raise ValueError("NameServer地址不能为空")
        
        if self.send_msg_timeout <= 0:
            raise ValueError("发送超时时间必须大于0")
        
        if self.max_message_size <= 0:
            raise ValueError("最大消息大小必须大于0")
        
        if self.default_topic_queue_nums <= 0:
            raise ValueError("默认Topic队列数必须大于0")

class ProducerException(Exception):
    """生产者异常"""
    pass

class MQClientException(ProducerException):
    """MQ客户端异常"""
    pass

class MQBrokerException(ProducerException):
    """MQ Broker异常"""
    
    def __init__(self, response_code: int, error_message: str):
        super().__init__(error_message)
        self.response_code = response_code
        self.error_message = error_message

class RemotingException(ProducerException):
    """远程调用异常"""
    pass

class InterruptedException(ProducerException):
    """中断异常"""
    pass

3.1.2 生产者生命周期管理

class ProducerLifecycleManager:
    """生产者生命周期管理器"""
    
    def __init__(self, config: ProducerConfig):
        self.config = config
        self.state = ProducerState.CREATE_JUST
        self.start_time: Optional[int] = None
        self.shutdown_time: Optional[int] = None
        self.lock = threading.RLock()
        self.listeners: List[Callable[[ProducerState, ProducerState], None]] = []
    
    def add_state_listener(self, listener: Callable[[ProducerState, ProducerState], None]) -> None:
        """添加状态监听器"""
        with self.lock:
            self.listeners.append(listener)
    
    def remove_state_listener(self, listener: Callable[[ProducerState, ProducerState], None]) -> None:
        """移除状态监听器"""
        with self.lock:
            if listener in self.listeners:
                self.listeners.remove(listener)
    
    def _notify_state_change(self, old_state: ProducerState, new_state: ProducerState) -> None:
        """通知状态变化"""
        for listener in self.listeners:
            try:
                listener(old_state, new_state)
            except Exception as e:
                print(f"状态监听器执行异常: {e}")
    
    def start(self) -> None:
        """启动生产者"""
        with self.lock:
            if self.state == ProducerState.CREATE_JUST:
                old_state = self.state
                self.state = ProducerState.RUNNING
                self.start_time = int(time.time() * 1000)
                self._notify_state_change(old_state, self.state)
                print(f"生产者启动成功,组: {self.config.producer_group}")
            elif self.state == ProducerState.RUNNING:
                print("生产者已经在运行中")
            else:
                raise ProducerException(f"生产者状态异常,无法启动: {self.state}")
    
    def shutdown(self) -> None:
        """关闭生产者"""
        with self.lock:
            if self.state == ProducerState.RUNNING:
                old_state = self.state
                self.state = ProducerState.SHUTDOWN_ALREADY
                self.shutdown_time = int(time.time() * 1000)
                self._notify_state_change(old_state, self.state)
                print(f"生产者关闭成功,组: {self.config.producer_group}")
            elif self.state == ProducerState.SHUTDOWN_ALREADY:
                print("生产者已经关闭")
            else:
                print(f"生产者状态: {self.state},执行关闭操作")
                self.state = ProducerState.SHUTDOWN_ALREADY
                self.shutdown_time = int(time.time() * 1000)
    
    def is_running(self) -> bool:
        """是否正在运行"""
        return self.state == ProducerState.RUNNING
    
    def get_state(self) -> ProducerState:
        """获取当前状态"""
        return self.state
    
    def get_uptime(self) -> int:
        """获取运行时间(毫秒)"""
        if self.start_time is None:
            return 0
        
        end_time = self.shutdown_time if self.shutdown_time else int(time.time() * 1000)
        return end_time - self.start_time
    
    def get_lifecycle_info(self) -> Dict[str, any]:
        """获取生命周期信息"""
        return {
            "state": self.state.value,
            "producer_group": self.config.producer_group,
            "start_time": self.start_time,
            "shutdown_time": self.shutdown_time,
            "uptime_ms": self.get_uptime(),
            "is_running": self.is_running()
        }

# 使用示例
if __name__ == "__main__":
    # 创建生产者配置
    config = ProducerConfig(
        producer_group="test_producer_group",
        nameserver_addr="192.168.1.100:9876"
    )
    
    # 验证配置
    config.validate()
    
    # 创建生命周期管理器
    lifecycle_manager = ProducerLifecycleManager(config)
    
    # 添加状态监听器
    def state_listener(old_state: ProducerState, new_state: ProducerState):
        print(f"生产者状态变化: {old_state.value} -> {new_state.value}")
    
    lifecycle_manager.add_state_listener(state_listener)
    
    try:
        # 启动生产者
        lifecycle_manager.start()
        
        # 模拟运行
        time.sleep(2)
        
        # 获取生命周期信息
        info = lifecycle_manager.get_lifecycle_info()
        print(f"生命周期信息: {json.dumps(info, indent=2)}")
        
    finally:
        # 关闭生产者
        lifecycle_manager.shutdown()

3.2 同步发送

3.2.1 同步发送实现

class SyncProducer:
    """同步生产者"""
    
    def __init__(self, config: ProducerConfig):
        self.config = config
        self.lifecycle_manager = ProducerLifecycleManager(config)
        self.message_queue_selector = SelectMessageQueueByRandom()
        self.send_statistics = SendStatistics()
        
        # 模拟的路由信息和Broker客户端
        self.topic_route_table: Dict[str, List[MessageQueue]] = {}
        self.broker_client = BrokerClient(config.nameserver_addr)
    
    def start(self) -> None:
        """启动生产者"""
        self.config.validate()
        self.lifecycle_manager.start()
        self._update_topic_route_info()
    
    def shutdown(self) -> None:
        """关闭生产者"""
        self.lifecycle_manager.shutdown()
        self.broker_client.shutdown()
    
    def _update_topic_route_info(self) -> None:
        """更新Topic路由信息"""
        # 模拟从NameServer获取路由信息
        # 实际实现中会通过网络请求获取
        pass
    
    def _get_topic_publish_info(self, topic: str) -> List[MessageQueue]:
        """获取Topic发布信息"""
        if topic not in self.topic_route_table:
            # 模拟创建默认的消息队列
            queues = []
            for i in range(self.config.default_topic_queue_nums):
                queue = MessageQueue(
                    topic=topic,
                    broker_name="broker-a",
                    queue_id=i
                )
                queues.append(queue)
            self.topic_route_table[topic] = queues
        
        return self.topic_route_table[topic]
    
    def _validate_message(self, msg: Message) -> None:
        """验证消息"""
        if not msg.topic:
            raise MQClientException("消息Topic不能为空")
        
        if not msg.body:
            raise MQClientException("消息体不能为空")
        
        if len(msg.body) > self.config.max_message_size:
            raise MQClientException(f"消息体大小超过限制: {len(msg.body)} > {self.config.max_message_size}")
        
        # 检查Topic名称合法性
        if not self._is_valid_topic_name(msg.topic):
            raise MQClientException(f"非法的Topic名称: {msg.topic}")
    
    def _is_valid_topic_name(self, topic: str) -> bool:
        """检查Topic名称是否合法"""
        if not topic or len(topic) > 127:
            return False
        
        # 简单的合法性检查
        import re
        pattern = r'^[a-zA-Z0-9_-]+$'
        return bool(re.match(pattern, topic))
    
    def _compress_message(self, msg: Message) -> Message:
        """压缩消息"""
        if len(msg.body) > self.config.compress_msg_body_over_howmuch:
            # 模拟压缩
            import zlib
            compressed_body = zlib.compress(msg.body)
            
            # 创建新的消息对象
            compressed_msg = Message(
                topic=msg.topic,
                body=compressed_body,
                tags=msg.tags,
                keys=msg.keys,
                flag=msg.flag,
                properties=msg.properties.copy() if msg.properties else {},
                transaction_id=msg.transaction_id
            )
            
            # 标记为压缩消息
            compressed_msg.put_property("COMPRESSED", "true")
            return compressed_msg
        
        return msg
    
    def send(self, msg: Message, timeout: Optional[int] = None) -> SendResult:
        """同步发送消息"""
        if not self.lifecycle_manager.is_running():
            raise ProducerException("生产者未启动")
        
        # 验证消息
        self._validate_message(msg)
        
        # 压缩消息
        msg = self._compress_message(msg)
        
        # 获取发送超时时间
        send_timeout = timeout if timeout is not None else self.config.send_msg_timeout
        
        # 重试发送
        last_exception = None
        for retry_count in range(self.config.retry_times_when_send_failed + 1):
            try:
                return self._send_default_impl(msg, send_timeout, retry_count)
            except Exception as e:
                last_exception = e
                if retry_count < self.config.retry_times_when_send_failed:
                    print(f"发送失败,第{retry_count + 1}次重试: {e}")
                    time.sleep(0.1)  # 短暂延迟后重试
        
        # 所有重试都失败
        self.send_statistics.record_send_failure()
        raise MQClientException(f"发送消息失败,已重试{self.config.retry_times_when_send_failed}次") from last_exception
    
    def _send_default_impl(self, msg: Message, timeout: int, retry_count: int) -> SendResult:
        """默认发送实现"""
        start_time = time.time()
        
        try:
            # 获取Topic发布信息
            message_queues = self._get_topic_publish_info(msg.topic)
            if not message_queues:
                raise MQClientException(f"没有找到Topic的路由信息: {msg.topic}")
            
            # 选择消息队列
            selected_queue = self.message_queue_selector.select(message_queues, msg, None)
            
            # 发送消息到Broker
            send_result = self._send_kernel_impl(
                msg=msg,
                mq=selected_queue,
                communication_mode=CommunicationMode.SYNC,
                timeout=timeout
            )
            
            # 记录发送统计
            send_time = int((time.time() - start_time) * 1000)
            self.send_statistics.record_send_success(send_time)
            
            return send_result
            
        except Exception as e:
            send_time = int((time.time() - start_time) * 1000)
            self.send_statistics.record_send_failure(send_time)
            raise e
    
    def _send_kernel_impl(self, msg: Message, mq: MessageQueue, 
                         communication_mode: CommunicationMode, timeout: int) -> SendResult:
        """核心发送实现"""
        # 生成消息ID
        msg_id = self._generate_message_id()
        
        # 模拟发送到Broker
        try:
            # 这里应该是实际的网络请求
            broker_result = self.broker_client.send_message(
                broker_name=mq.broker_name,
                message=msg,
                queue_id=mq.queue_id,
                timeout=timeout
            )
            
            # 构造发送结果
            send_result = SendResult(
                send_status=SendStatus.SEND_OK,
                msg_id=msg_id,
                message_queue=mq,
                queue_offset=broker_result.get("queue_offset", 0),
                offset_msg_id=broker_result.get("offset_msg_id"),
                region_id=broker_result.get("region_id")
            )
            
            return send_result
            
        except Exception as e:
            raise RemotingException(f"发送消息到Broker失败: {e}")
    
    def _generate_message_id(self) -> str:
        """生成消息ID"""
        timestamp = int(time.time() * 1000)
        import uuid
        unique_id = str(uuid.uuid4()).replace('-', '')
        return f"{timestamp}{unique_id[:8]}"
    
    def send_to_queue(self, msg: Message, mq: MessageQueue, timeout: Optional[int] = None) -> SendResult:
        """发送消息到指定队列"""
        if not self.lifecycle_manager.is_running():
            raise ProducerException("生产者未启动")
        
        # 验证消息
        self._validate_message(msg)
        
        # 压缩消息
        msg = self._compress_message(msg)
        
        # 获取发送超时时间
        send_timeout = timeout if timeout is not None else self.config.send_msg_timeout
        
        # 发送消息
        return self._send_kernel_impl(
            msg=msg,
            mq=mq,
            communication_mode=CommunicationMode.SYNC,
            timeout=send_timeout
        )
    
    def get_send_statistics(self) -> Dict[str, any]:
        """获取发送统计信息"""
        return self.send_statistics.get_statistics()

class SendStatistics:
    """发送统计"""
    
    def __init__(self):
        self.total_send_count = 0
        self.success_send_count = 0
        self.failure_send_count = 0
        self.total_send_time = 0
        self.max_send_time = 0
        self.min_send_time = float('inf')
        self.lock = threading.Lock()
    
    def record_send_success(self, send_time: int) -> None:
        """记录发送成功"""
        with self.lock:
            self.total_send_count += 1
            self.success_send_count += 1
            self.total_send_time += send_time
            self.max_send_time = max(self.max_send_time, send_time)
            self.min_send_time = min(self.min_send_time, send_time)
    
    def record_send_failure(self, send_time: int = 0) -> None:
        """记录发送失败"""
        with self.lock:
            self.total_send_count += 1
            self.failure_send_count += 1
            if send_time > 0:
                self.total_send_time += send_time
    
    def get_statistics(self) -> Dict[str, any]:
        """获取统计信息"""
        with self.lock:
            avg_send_time = (self.total_send_time / self.success_send_count 
                           if self.success_send_count > 0 else 0)
            
            success_rate = (self.success_send_count / self.total_send_count * 100 
                          if self.total_send_count > 0 else 0)
            
            return {
                "total_send_count": self.total_send_count,
                "success_send_count": self.success_send_count,
                "failure_send_count": self.failure_send_count,
                "success_rate": round(success_rate, 2),
                "avg_send_time_ms": round(avg_send_time, 2),
                "max_send_time_ms": self.max_send_time if self.max_send_time != 0 else 0,
                "min_send_time_ms": self.min_send_time if self.min_send_time != float('inf') else 0
            }

class BrokerClient:
    """Broker客户端(模拟实现)"""
    
    def __init__(self, nameserver_addr: str):
        self.nameserver_addr = nameserver_addr
        self.is_shutdown = False
    
    def send_message(self, broker_name: str, message: Message, 
                    queue_id: int, timeout: int) -> Dict[str, any]:
        """发送消息到Broker"""
        if self.is_shutdown:
            raise RemotingException("Broker客户端已关闭")
        
        # 模拟网络延迟
        time.sleep(0.01)
        
        # 模拟发送结果
        import random
        queue_offset = random.randint(1000, 9999)
        
        return {
            "queue_offset": queue_offset,
            "offset_msg_id": f"{broker_name}_{queue_id}_{queue_offset}",
            "region_id": "DefaultRegion"
        }
    
    def shutdown(self) -> None:
        """关闭客户端"""
        self.is_shutdown = True

# 使用示例
if __name__ == "__main__":
    # 创建生产者配置
    config = ProducerConfig(
        producer_group="sync_producer_group",
        nameserver_addr="192.168.1.100:9876",
        send_msg_timeout=5000,
        retry_times_when_send_failed=3
    )
    
    # 创建同步生产者
    producer = SyncProducer(config)
    
    try:
        # 启动生产者
        producer.start()
        
        # 创建消息
        message = Message(
            topic="TestTopic",
            body="Hello RocketMQ!".encode('utf-8'),
            tags="TagA",
            keys="OrderID_001"
        )
        
        # 设置消息属性
        message.put_property("userId", "12345")
        message.put_property("orderId", "ORDER_001")
        
        # 同步发送消息
        send_result = producer.send(message)
        print(f"消息发送成功: {send_result}")
        
        # 发送多条消息进行测试
        for i in range(10):
            test_message = Message(
                topic="TestTopic",
                body=f"Test message {i}".encode('utf-8'),
                tags="TagB",
                keys=f"TestKey_{i}"
            )
            
            result = producer.send(test_message)
            print(f"消息 {i} 发送结果: {result.send_status.value}")
        
        # 获取发送统计
        statistics = producer.get_send_statistics()
        print(f"\n发送统计信息:")
        for key, value in statistics.items():
            print(f"  {key}: {value}")
        
    except Exception as e:
        print(f"发送消息异常: {e}")
    finally:
        # 关闭生产者
        producer.shutdown()

3.2.2 队列选择策略

class MessageQueueLoadBalancer:
    """消息队列负载均衡器"""
    
    def __init__(self):
        self.queue_send_count: Dict[MessageQueue, int] = {}
        self.queue_last_send_time: Dict[MessageQueue, int] = {}
        self.queue_send_latency: Dict[MessageQueue, List[int]] = {}
        self.lock = threading.Lock()
    
    def record_send_result(self, mq: MessageQueue, send_time: int, success: bool) -> None:
        """记录发送结果"""
        with self.lock:
            current_time = int(time.time() * 1000)
            
            # 更新发送次数
            self.queue_send_count[mq] = self.queue_send_count.get(mq, 0) + 1
            
            # 更新最后发送时间
            self.queue_last_send_time[mq] = current_time
            
            # 更新发送延迟
            if mq not in self.queue_send_latency:
                self.queue_send_latency[mq] = []
            
            latency_list = self.queue_send_latency[mq]
            latency_list.append(send_time)
            
            # 只保留最近100次的延迟记录
            if len(latency_list) > 100:
                latency_list.pop(0)
    
    def get_queue_avg_latency(self, mq: MessageQueue) -> float:
        """获取队列平均延迟"""
        with self.lock:
            latency_list = self.queue_send_latency.get(mq, [])
            if not latency_list:
                return 0.0
            return sum(latency_list) / len(latency_list)
    
    def get_queue_send_count(self, mq: MessageQueue) -> int:
        """获取队列发送次数"""
        with self.lock:
            return self.queue_send_count.get(mq, 0)
    
    def get_load_balance_info(self) -> Dict[str, any]:
        """获取负载均衡信息"""
        with self.lock:
            info = {}
            for mq in self.queue_send_count.keys():
                queue_key = f"{mq.topic}#{mq.broker_name}#{mq.queue_id}"
                info[queue_key] = {
                    "send_count": self.queue_send_count.get(mq, 0),
                    "avg_latency_ms": round(self.get_queue_avg_latency(mq), 2),
                    "last_send_time": self.queue_last_send_time.get(mq, 0)
                }
            return info

class SelectMessageQueueByLatency(MessageQueueSelector):
    """基于延迟的队列选择器"""
    
    def __init__(self, load_balancer: MessageQueueLoadBalancer):
        self.load_balancer = load_balancer
    
    def select(self, mqs: List[MessageQueue], msg: Message, arg: object) -> MessageQueue:
        """选择延迟最低的队列"""
        if not mqs:
            raise ValueError("消息队列列表不能为空")
        
        if len(mqs) == 1:
            return mqs[0]
        
        # 找到延迟最低的队列
        best_queue = mqs[0]
        best_latency = self.load_balancer.get_queue_avg_latency(best_queue)
        
        for mq in mqs[1:]:
            latency = self.load_balancer.get_queue_avg_latency(mq)
            if latency < best_latency:
                best_queue = mq
                best_latency = latency
        
        return best_queue

class SelectMessageQueueByRoundRobin(MessageQueueSelector):
    """轮询队列选择器"""
    
    def __init__(self):
        self.topic_queue_index: Dict[str, int] = {}
        self.lock = threading.Lock()
    
    def select(self, mqs: List[MessageQueue], msg: Message, arg: object) -> MessageQueue:
        """轮询选择队列"""
        if not mqs:
            raise ValueError("消息队列列表不能为空")
        
        with self.lock:
            topic = msg.topic
            current_index = self.topic_queue_index.get(topic, 0)
            selected_queue = mqs[current_index % len(mqs)]
            
            # 更新索引
            self.topic_queue_index[topic] = (current_index + 1) % len(mqs)
            
            return selected_queue

class SelectMessageQueueByWeight(MessageQueueSelector):
    """基于权重的队列选择器"""
    
    def __init__(self, queue_weights: Dict[str, int]):
        """
        queue_weights: 队列权重配置,格式为 {"broker_name": weight}
        """
        self.queue_weights = queue_weights
        self.weighted_queues: List[MessageQueue] = []
        self.lock = threading.Lock()
    
    def _build_weighted_queues(self, mqs: List[MessageQueue]) -> None:
        """构建加权队列列表"""
        self.weighted_queues.clear()
        
        for mq in mqs:
            weight = self.queue_weights.get(mq.broker_name, 1)
            # 根据权重重复添加队列
            for _ in range(weight):
                self.weighted_queues.append(mq)
    
    def select(self, mqs: List[MessageQueue], msg: Message, arg: object) -> MessageQueue:
        """基于权重选择队列"""
        if not mqs:
            raise ValueError("消息队列列表不能为空")
        
        with self.lock:
            # 重新构建加权队列(如果队列列表发生变化)
            if not self.weighted_queues or len(self.weighted_queues) == 0:
                self._build_weighted_queues(mqs)
            
            if not self.weighted_queues:
                return mqs[0]  # 降级到第一个队列
            
            # 随机选择加权队列中的一个
            import random
            return random.choice(self.weighted_queues)

class FaultTolerantMessageQueueSelector(MessageQueueSelector):
    """容错队列选择器"""
    
    def __init__(self, load_balancer: MessageQueueLoadBalancer):
        self.load_balancer = load_balancer
        self.fault_tolerance_enabled = True
        self.latency_threshold = 1000  # 延迟阈值(ms)
        self.failure_threshold = 5  # 失败阈值
    
    def select(self, mqs: List[MessageQueue], msg: Message, arg: object) -> MessageQueue:
        """容错选择队列"""
        if not mqs:
            raise ValueError("消息队列列表不能为空")
        
        if not self.fault_tolerance_enabled:
            # 如果未启用容错,使用随机选择
            import random
            return random.choice(mqs)
        
        # 过滤掉有问题的队列
        healthy_queues = self._filter_healthy_queues(mqs)
        
        if not healthy_queues:
            # 如果没有健康的队列,降级到原始列表
            healthy_queues = mqs
        
        # 从健康队列中选择延迟最低的
        best_queue = healthy_queues[0]
        best_latency = self.load_balancer.get_queue_avg_latency(best_queue)
        
        for mq in healthy_queues[1:]:
            latency = self.load_balancer.get_queue_avg_latency(mq)
            if latency < best_latency:
                best_queue = mq
                best_latency = latency
        
        return best_queue
    
    def _filter_healthy_queues(self, mqs: List[MessageQueue]) -> List[MessageQueue]:
        """过滤健康的队列"""
        healthy_queues = []
        
        for mq in mqs:
            avg_latency = self.load_balancer.get_queue_avg_latency(mq)
            
            # 检查延迟是否在阈值内
            if avg_latency <= self.latency_threshold:
                healthy_queues.append(mq)
        
        return healthy_queues
    
    def set_latency_threshold(self, threshold: int) -> None:
        """设置延迟阈值"""
        self.latency_threshold = threshold
    
    def enable_fault_tolerance(self, enabled: bool) -> None:
        """启用/禁用容错"""
        self.fault_tolerance_enabled = enabled

# 使用示例
if __name__ == "__main__":
    # 创建负载均衡器
    load_balancer = MessageQueueLoadBalancer()
    
    # 创建消息队列
    queues = [
        MessageQueue("TestTopic", "broker-a", 0),
        MessageQueue("TestTopic", "broker-a", 1),
        MessageQueue("TestTopic", "broker-b", 0),
        MessageQueue("TestTopic", "broker-b", 1),
    ]
    
    # 模拟发送结果记录
    load_balancer.record_send_result(queues[0], 50, True)
    load_balancer.record_send_result(queues[1], 100, True)
    load_balancer.record_send_result(queues[2], 200, True)
    load_balancer.record_send_result(queues[3], 80, True)
    
    # 测试不同的队列选择策略
    message = Message("TestTopic", "test".encode('utf-8'))
    
    # 1. 基于延迟的选择器
    latency_selector = SelectMessageQueueByLatency(load_balancer)
    selected = latency_selector.select(queues, message, None)
    print(f"延迟选择器选择的队列: {selected.broker_name}#{selected.queue_id}")
    
    # 2. 轮询选择器
    round_robin_selector = SelectMessageQueueByRoundRobin()
    for i in range(5):
        selected = round_robin_selector.select(queues, message, None)
        print(f"轮询选择器第{i+1}次选择: {selected.broker_name}#{selected.queue_id}")
    
    # 3. 基于权重的选择器
    weights = {"broker-a": 3, "broker-b": 1}  # broker-a权重为3,broker-b权重为1
    weight_selector = SelectMessageQueueByWeight(weights)
    weight_results = {}
    for i in range(100):
        selected = weight_selector.select(queues, message, None)
        broker = selected.broker_name
        weight_results[broker] = weight_results.get(broker, 0) + 1
    
    print(f"\n权重选择器结果分布:")
    for broker, count in weight_results.items():
        print(f"  {broker}: {count}次 ({count}%)")
    
    # 4. 容错选择器
    fault_tolerant_selector = FaultTolerantMessageQueueSelector(load_balancer)
    selected = fault_tolerant_selector.select(queues, message, None)
    print(f"\n容错选择器选择的队列: {selected.broker_name}#{selected.queue_id}")
    
    # 获取负载均衡信息
    balance_info = load_balancer.get_load_balance_info()
    print(f"\n负载均衡信息:")
    for queue_key, info in balance_info.items():
        print(f"  {queue_key}: {info}")

3.3 异步发送

3.3.1 异步发送实现

import asyncio
import concurrent.futures
from typing import Awaitable

class AsyncSendCallback(SendCallback):
    """异步发送回调实现"""
    
    def __init__(self, callback_name: str = "default"):
        self.callback_name = callback_name
        self.success_count = 0
        self.failure_count = 0
        self.lock = threading.Lock()
    
    def on_success(self, send_result: SendResult) -> None:
        """发送成功回调"""
        with self.lock:
            self.success_count += 1
        
        print(f"[{self.callback_name}] 消息发送成功: "
              f"msgId={send_result.msg_id}, "
              f"queue={send_result.message_queue.broker_name}#{send_result.message_queue.queue_id}")
    
    def on_exception(self, exception: Exception) -> None:
        """发送异常回调"""
        with self.lock:
            self.failure_count += 1
        
        print(f"[{self.callback_name}] 消息发送失败: {exception}")
    
    def get_statistics(self) -> Dict[str, int]:
        """获取回调统计"""
        with self.lock:
            return {
                "success_count": self.success_count,
                "failure_count": self.failure_count,
                "total_count": self.success_count + self.failure_count
            }

class AsyncProducer:
    """异步生产者"""
    
    def __init__(self, config: ProducerConfig):
        self.config = config
        self.lifecycle_manager = ProducerLifecycleManager(config)
        self.message_queue_selector = SelectMessageQueueByRandom()
        self.send_statistics = SendStatistics()
        
        # 异步发送相关
        self.executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=10,  # 异步发送线程池大小
            thread_name_prefix="AsyncSend"
        )
        self.pending_sends: Dict[str, concurrent.futures.Future] = {}
        self.pending_sends_lock = threading.Lock()
        
        # 模拟的路由信息和Broker客户端
        self.topic_route_table: Dict[str, List[MessageQueue]] = {}
        self.broker_client = BrokerClient(config.nameserver_addr)
    
    def start(self) -> None:
        """启动生产者"""
        self.config.validate()
        self.lifecycle_manager.start()
        self._update_topic_route_info()
    
    def shutdown(self) -> None:
        """关闭生产者"""
        # 等待所有异步发送完成
        self._wait_for_pending_sends(timeout=5.0)
        
        # 关闭线程池
        self.executor.shutdown(wait=True)
        
        # 关闭生命周期管理器和Broker客户端
        self.lifecycle_manager.shutdown()
        self.broker_client.shutdown()
    
    def _wait_for_pending_sends(self, timeout: float) -> None:
        """等待待发送的消息完成"""
        with self.pending_sends_lock:
            pending_futures = list(self.pending_sends.values())
        
        if pending_futures:
            print(f"等待 {len(pending_futures)} 个异步发送任务完成...")
            
            try:
                # 等待所有任务完成或超时
                concurrent.futures.wait(pending_futures, timeout=timeout)
            except Exception as e:
                print(f"等待异步发送任务完成时发生异常: {e}")
    
    def _update_topic_route_info(self) -> None:
        """更新Topic路由信息"""
        # 模拟从NameServer获取路由信息
        pass
    
    def _get_topic_publish_info(self, topic: str) -> List[MessageQueue]:
        """获取Topic发布信息"""
        if topic not in self.topic_route_table:
            # 模拟创建默认的消息队列
            queues = []
            for i in range(self.config.default_topic_queue_nums):
                queue = MessageQueue(
                    topic=topic,
                    broker_name="broker-a",
                    queue_id=i
                )
                queues.append(queue)
            self.topic_route_table[topic] = queues
        
        return self.topic_route_table[topic]
    
    def _validate_message(self, msg: Message) -> None:
        """验证消息"""
        if not msg.topic:
            raise MQClientException("消息Topic不能为空")
        
        if not msg.body:
            raise MQClientException("消息体不能为空")
        
        if len(msg.body) > self.config.max_message_size:
            raise MQClientException(f"消息体大小超过限制: {len(msg.body)} > {self.config.max_message_size}")
    
    def _compress_message(self, msg: Message) -> Message:
        """压缩消息"""
        if len(msg.body) > self.config.compress_msg_body_over_howmuch:
            import zlib
            compressed_body = zlib.compress(msg.body)
            
            compressed_msg = Message(
                topic=msg.topic,
                body=compressed_body,
                tags=msg.tags,
                keys=msg.keys,
                flag=msg.flag,
                properties=msg.properties.copy() if msg.properties else {},
                transaction_id=msg.transaction_id
            )
            
            compressed_msg.put_property("COMPRESSED", "true")
            return compressed_msg
        
        return msg
    
    def send_async(self, msg: Message, callback: SendCallback, 
                  timeout: Optional[int] = None) -> str:
        """异步发送消息"""
        if not self.lifecycle_manager.is_running():
            raise ProducerException("生产者未启动")
        
        # 验证消息
        self._validate_message(msg)
        
        # 压缩消息
        msg = self._compress_message(msg)
        
        # 生成发送ID
        send_id = self._generate_send_id()
        
        # 获取发送超时时间
        send_timeout = timeout if timeout is not None else self.config.send_msg_timeout
        
        # 提交异步发送任务
        future = self.executor.submit(
            self._async_send_impl, msg, callback, send_timeout, send_id
        )
        
        # 记录待发送任务
        with self.pending_sends_lock:
            self.pending_sends[send_id] = future
        
        # 添加完成回调,用于清理
        future.add_done_callback(lambda f: self._cleanup_pending_send(send_id))
        
        return send_id
    
    def _async_send_impl(self, msg: Message, callback: SendCallback, 
                        timeout: int, send_id: str) -> None:
        """异步发送实现"""
        start_time = time.time()
        
        try:
            # 重试发送
            last_exception = None
            for retry_count in range(self.config.retry_times_when_send_async_failed + 1):
                try:
                    send_result = self._send_default_impl(msg, timeout, retry_count)
                    
                    # 记录发送统计
                    send_time = int((time.time() - start_time) * 1000)
                    self.send_statistics.record_send_success(send_time)
                    
                    # 调用成功回调
                    callback.on_success(send_result)
                    return
                    
                except Exception as e:
                    last_exception = e
                    if retry_count < self.config.retry_times_when_send_async_failed:
                        print(f"异步发送失败,第{retry_count + 1}次重试: {e}")
                        time.sleep(0.1)  # 短暂延迟后重试
            
            # 所有重试都失败
            send_time = int((time.time() - start_time) * 1000)
            self.send_statistics.record_send_failure(send_time)
            
            final_exception = MQClientException(
                f"异步发送消息失败,已重试{self.config.retry_times_when_send_async_failed}次"
            )
            callback.on_exception(final_exception)
            
        except Exception as e:
            # 意外异常
            send_time = int((time.time() - start_time) * 1000)
            self.send_statistics.record_send_failure(send_time)
            callback.on_exception(e)
    
    def _send_default_impl(self, msg: Message, timeout: int, retry_count: int) -> SendResult:
        """默认发送实现"""
        # 获取Topic发布信息
        message_queues = self._get_topic_publish_info(msg.topic)
        if not message_queues:
            raise MQClientException(f"没有找到Topic的路由信息: {msg.topic}")
        
        # 选择消息队列
        selected_queue = self.message_queue_selector.select(message_queues, msg, None)
        
        # 发送消息到Broker
        return self._send_kernel_impl(
            msg=msg,
            mq=selected_queue,
            communication_mode=CommunicationMode.ASYNC,
            timeout=timeout
        )
    
    def _send_kernel_impl(self, msg: Message, mq: MessageQueue, 
                         communication_mode: CommunicationMode, timeout: int) -> SendResult:
        """核心发送实现"""
        # 生成消息ID
        msg_id = self._generate_message_id()
        
        # 模拟发送到Broker
        try:
            broker_result = self.broker_client.send_message(
                broker_name=mq.broker_name,
                message=msg,
                queue_id=mq.queue_id,
                timeout=timeout
            )
            
            # 构造发送结果
            send_result = SendResult(
                send_status=SendStatus.SEND_OK,
                msg_id=msg_id,
                message_queue=mq,
                queue_offset=broker_result.get("queue_offset", 0),
                offset_msg_id=broker_result.get("offset_msg_id"),
                region_id=broker_result.get("region_id")
            )
            
            return send_result
            
        except Exception as e:
            raise RemotingException(f"发送消息到Broker失败: {e}")
    
    def _generate_send_id(self) -> str:
        """生成发送ID"""
        import uuid
        return str(uuid.uuid4())
    
    def _generate_message_id(self) -> str:
        """生成消息ID"""
        timestamp = int(time.time() * 1000)
        import uuid
        unique_id = str(uuid.uuid4()).replace('-', '')
        return f"{timestamp}{unique_id[:8]}"
    
    def _cleanup_pending_send(self, send_id: str) -> None:
        """清理待发送任务"""
        with self.pending_sends_lock:
            if send_id in self.pending_sends:
                del self.pending_sends[send_id]
    
    def get_pending_send_count(self) -> int:
        """获取待发送任务数量"""
        with self.pending_sends_lock:
            return len(self.pending_sends)
    
    def get_send_statistics(self) -> Dict[str, any]:
        """获取发送统计信息"""
        stats = self.send_statistics.get_statistics()
        stats["pending_send_count"] = self.get_pending_send_count()
        return stats

# 使用示例
if __name__ == "__main__":
    # 创建生产者配置
    config = ProducerConfig(
        producer_group="async_producer_group",
        nameserver_addr="192.168.1.100:9876",
        send_msg_timeout=5000,
        retry_times_when_send_async_failed=2
    )
    
    # 创建异步生产者
    producer = AsyncProducer(config)
    
    try:
        # 启动生产者
        producer.start()
        
        # 创建回调
        callback = AsyncSendCallback("TestCallback")
        
        # 异步发送多条消息
        send_ids = []
        for i in range(10):
            message = Message(
                topic="TestTopic",
                body=f"Async message {i}".encode('utf-8'),
                tags="AsyncTag",
                keys=f"AsyncKey_{i}"
            )
            
            send_id = producer.send_async(message, callback)
            send_ids.append(send_id)
            print(f"提交异步发送任务 {i}: {send_id}")
        
        # 等待一段时间让异步发送完成
        print("\n等待异步发送完成...")
        time.sleep(3)
        
        # 获取回调统计
        callback_stats = callback.get_statistics()
        print(f"\n回调统计: {callback_stats}")
        
        # 获取发送统计
        send_stats = producer.get_send_statistics()
        print(f"发送统计: {send_stats}")
        
    except Exception as e:
        print(f"异步发送异常: {e}")
    finally:
        # 关闭生产者
        producer.shutdown()

3.3.2 批量异步发送

class BatchAsyncProducer:
    """批量异步生产者"""
    
    def __init__(self, config: ProducerConfig, batch_size: int = 100):
        self.config = config
        self.batch_size = batch_size
        self.async_producer = AsyncProducer(config)
        
        # 批量发送相关
        self.message_batch: List[Message] = []
        self.callback_batch: List[SendCallback] = []
        self.batch_lock = threading.Lock()
        self.batch_timer: Optional[threading.Timer] = None
        self.batch_timeout = 1.0  # 批量超时时间(秒)
        
        # 批量统计
        self.batch_statistics = BatchSendStatistics()
    
    def start(self) -> None:
        """启动批量生产者"""
        self.async_producer.start()
    
    def shutdown(self) -> None:
        """关闭批量生产者"""
        # 发送剩余的批量消息
        self._flush_batch()
        
        # 取消定时器
        if self.batch_timer:
            self.batch_timer.cancel()
        
        # 关闭异步生产者
        self.async_producer.shutdown()
    
    def send_batch_async(self, msg: Message, callback: SendCallback) -> None:
        """添加消息到批量发送"""
        with self.batch_lock:
            self.message_batch.append(msg)
            self.callback_batch.append(callback)
            
            # 检查是否达到批量大小
            if len(self.message_batch) >= self.batch_size:
                self._flush_batch()
            else:
                # 启动或重置定时器
                self._reset_batch_timer()
    
    def _reset_batch_timer(self) -> None:
        """重置批量定时器"""
        if self.batch_timer:
            self.batch_timer.cancel()
        
        self.batch_timer = threading.Timer(self.batch_timeout, self._flush_batch)
        self.batch_timer.start()
    
    def _flush_batch(self) -> None:
        """刷新批量消息"""
        with self.batch_lock:
            if not self.message_batch:
                return
            
            # 复制当前批量
            current_batch = self.message_batch.copy()
            current_callbacks = self.callback_batch.copy()
            
            # 清空批量
            self.message_batch.clear()
            self.callback_batch.clear()
            
            # 取消定时器
            if self.batch_timer:
                self.batch_timer.cancel()
                self.batch_timer = None
        
        # 发送批量消息
        self._send_batch_messages(current_batch, current_callbacks)
    
    def _send_batch_messages(self, messages: List[Message], callbacks: List[SendCallback]) -> None:
        """发送批量消息"""
        batch_start_time = time.time()
        
        # 创建批量回调包装器
        batch_callback = BatchSendCallback(
            callbacks=callbacks,
            batch_size=len(messages),
            statistics=self.batch_statistics
        )
        
        # 异步发送每条消息
        for i, msg in enumerate(messages):
            try:
                self.async_producer.send_async(
                    msg=msg,
                    callback=batch_callback.create_message_callback(i),
                    timeout=self.config.send_msg_timeout
                )
            except Exception as e:
                # 如果提交异步发送失败,直接调用失败回调
                callbacks[i].on_exception(e)
        
        # 记录批量发送统计
        batch_time = int((time.time() - batch_start_time) * 1000)
        self.batch_statistics.record_batch_send(len(messages), batch_time)
    
    def get_batch_statistics(self) -> Dict[str, any]:
        """获取批量发送统计"""
        return self.batch_statistics.get_statistics()

class BatchSendCallback:
    """批量发送回调包装器"""
    
    def __init__(self, callbacks: List[SendCallback], batch_size: int, 
                 statistics: 'BatchSendStatistics'):
        self.callbacks = callbacks
        self.batch_size = batch_size
        self.statistics = statistics
        self.completed_count = 0
        self.success_count = 0
        self.failure_count = 0
        self.lock = threading.Lock()
    
    def create_message_callback(self, index: int) -> SendCallback:
        """为指定索引的消息创建回调"""
        return MessageCallbackWrapper(self, index)
    
    def on_message_complete(self, index: int, success: bool, 
                          send_result: Optional[SendResult] = None, 
                          exception: Optional[Exception] = None) -> None:
        """消息完成回调"""
        with self.lock:
            self.completed_count += 1
            
            if success:
                self.success_count += 1
                self.callbacks[index].on_success(send_result)
            else:
                self.failure_count += 1
                self.callbacks[index].on_exception(exception)
            
            # 检查批量是否完成
            if self.completed_count == self.batch_size:
                self._on_batch_complete()
    
    def _on_batch_complete(self) -> None:
        """批量完成回调"""
        self.statistics.record_batch_complete(
            total_count=self.batch_size,
            success_count=self.success_count,
            failure_count=self.failure_count
        )

class MessageCallbackWrapper(SendCallback):
    """消息回调包装器"""
    
    def __init__(self, batch_callback: BatchSendCallback, index: int):
        self.batch_callback = batch_callback
        self.index = index
    
    def on_success(self, send_result: SendResult) -> None:
        """发送成功回调"""
        self.batch_callback.on_message_complete(
            index=self.index,
            success=True,
            send_result=send_result
        )
    
    def on_exception(self, exception: Exception) -> None:
        """发送异常回调"""
        self.batch_callback.on_message_complete(
            index=self.index,
            success=False,
            exception=exception
        )

class BatchSendStatistics:
    """批量发送统计"""
    
    def __init__(self):
        self.total_batch_count = 0
        self.total_message_count = 0
        self.success_message_count = 0
        self.failure_message_count = 0
        self.total_batch_time = 0
        self.max_batch_time = 0
        self.min_batch_time = float('inf')
        self.lock = threading.Lock()
    
    def record_batch_send(self, message_count: int, batch_time: int) -> None:
        """记录批量发送"""
        with self.lock:
            self.total_batch_count += 1
            self.total_message_count += message_count
            self.total_batch_time += batch_time
            self.max_batch_time = max(self.max_batch_time, batch_time)
            self.min_batch_time = min(self.min_batch_time, batch_time)
    
    def record_batch_complete(self, total_count: int, success_count: int, failure_count: int) -> None:
        """记录批量完成"""
        with self.lock:
            self.success_message_count += success_count
            self.failure_message_count += failure_count
    
    def get_statistics(self) -> Dict[str, any]:
        """获取统计信息"""
        with self.lock:
            avg_batch_time = (self.total_batch_time / self.total_batch_count 
                            if self.total_batch_count > 0 else 0)
            
            success_rate = (self.success_message_count / self.total_message_count * 100 
                          if self.total_message_count > 0 else 0)
            
            avg_batch_size = (self.total_message_count / self.total_batch_count 
                            if self.total_batch_count > 0 else 0)
            
            return {
                "total_batch_count": self.total_batch_count,
                "total_message_count": self.total_message_count,
                "success_message_count": self.success_message_count,
                "failure_message_count": self.failure_message_count,
                "success_rate": round(success_rate, 2),
                "avg_batch_size": round(avg_batch_size, 2),
                "avg_batch_time_ms": round(avg_batch_time, 2),
                "max_batch_time_ms": self.max_batch_time if self.max_batch_time != 0 else 0,
                "min_batch_time_ms": self.min_batch_time if self.min_batch_time != float('inf') else 0
            }

# 使用示例
if __name__ == "__main__":
    # 创建生产者配置
    config = ProducerConfig(
        producer_group="batch_async_producer_group",
        nameserver_addr="192.168.1.100:9876",
        send_msg_timeout=5000
    )
    
    # 创建批量异步生产者
    batch_producer = BatchAsyncProducer(config, batch_size=5)
    
    try:
        # 启动生产者
        batch_producer.start()
        
        # 创建回调
        callback = AsyncSendCallback("BatchCallback")
        
        # 批量发送消息
        for i in range(12):  # 发送12条消息,会分成3个批次
            message = Message(
                topic="BatchTestTopic",
                body=f"Batch message {i}".encode('utf-8'),
                tags="BatchTag",
                keys=f"BatchKey_{i}"
            )
            
            batch_producer.send_batch_async(message, callback)
            print(f"添加消息到批量 {i}")
            
            # 模拟消息产生间隔
            time.sleep(0.1)
        
        # 等待批量发送完成
        print("\n等待批量发送完成...")
        time.sleep(3)
        
        # 获取批量统计
        batch_stats = batch_producer.get_batch_statistics()
        print(f"\n批量发送统计: {json.dumps(batch_stats, indent=2)}")
        
        # 获取回调统计
        callback_stats = callback.get_statistics()
        print(f"\n回调统计: {callback_stats}")
        
    except Exception as e:
        print(f"批量异步发送异常: {e}")
    finally:
        # 关闭生产者
        batch_producer.shutdown()

3.4 单向发送

3.4.1 单向发送实现

class OnewayProducer:
    """单向生产者"""
    
    def __init__(self, config: ProducerConfig):
        self.config = config
        self.lifecycle_manager = ProducerLifecycleManager(config)
        self.message_queue_selector = SelectMessageQueueByRandom()
        self.send_statistics = OnewayStatistics()
        
        # 模拟的路由信息和Broker客户端
        self.topic_route_table: Dict[str, List[MessageQueue]] = {}
        self.broker_client = BrokerClient(config.nameserver_addr)
    
    def start(self) -> None:
        """启动生产者"""
        self.config.validate()
        self.lifecycle_manager.start()
        self._update_topic_route_info()
    
    def shutdown(self) -> None:
        """关闭生产者"""
        self.lifecycle_manager.shutdown()
        self.broker_client.shutdown()
    
    def _update_topic_route_info(self) -> None:
        """更新Topic路由信息"""
        # 模拟从NameServer获取路由信息
        pass
    
    def _get_topic_publish_info(self, topic: str) -> List[MessageQueue]:
        """获取Topic发布信息"""
        if topic not in self.topic_route_table:
            # 模拟创建默认的消息队列
            queues = []
            for i in range(self.config.default_topic_queue_nums):
                queue = MessageQueue(
                    topic=topic,
                    broker_name="broker-a",
                    queue_id=i
                )
                queues.append(queue)
            self.topic_route_table[topic] = queues
        
        return self.topic_route_table[topic]
    
    def _validate_message(self, msg: Message) -> None:
        """验证消息"""
        if not msg.topic:
            raise MQClientException("消息Topic不能为空")
        
        if not msg.body:
            raise MQClientException("消息体不能为空")
        
        if len(msg.body) > self.config.max_message_size:
            raise MQClientException(f"消息体大小超过限制: {len(msg.body)} > {self.config.max_message_size}")
    
    def _compress_message(self, msg: Message) -> Message:
        """压缩消息"""
        if len(msg.body) > self.config.compress_msg_body_over_howmuch:
            import zlib
            compressed_body = zlib.compress(msg.body)
            
            compressed_msg = Message(
                topic=msg.topic,
                body=compressed_body,
                tags=msg.tags,
                keys=msg.keys,
                flag=msg.flag,
                properties=msg.properties.copy() if msg.properties else {},
                transaction_id=msg.transaction_id
            )
            
            compressed_msg.put_property("COMPRESSED", "true")
            return compressed_msg
        
        return msg
    
    def send_oneway(self, msg: Message) -> None:
        """单向发送消息(不等待响应)"""
        if not self.lifecycle_manager.is_running():
            raise ProducerException("生产者未启动")
        
        start_time = time.time()
        
        try:
            # 验证消息
            self._validate_message(msg)
            
            # 压缩消息
            msg = self._compress_message(msg)
            
            # 获取Topic发布信息
            message_queues = self._get_topic_publish_info(msg.topic)
            if not message_queues:
                raise MQClientException(f"没有找到Topic的路由信息: {msg.topic}")
            
            # 选择消息队列
            selected_queue = self.message_queue_selector.select(message_queues, msg, None)
            
            # 单向发送消息到Broker
            self._send_oneway_impl(msg, selected_queue)
            
            # 记录发送统计
            send_time = int((time.time() - start_time) * 1000)
            self.send_statistics.record_send(send_time)
            
        except Exception as e:
            send_time = int((time.time() - start_time) * 1000)
            self.send_statistics.record_send_failure(send_time)
            raise e
    
    def _send_oneway_impl(self, msg: Message, mq: MessageQueue) -> None:
        """单向发送实现"""
        try:
            # 单向发送不等待响应,直接发送即可
            self.broker_client.send_message_oneway(
                broker_name=mq.broker_name,
                message=msg,
                queue_id=mq.queue_id
            )
            
        except Exception as e:
            raise RemotingException(f"单向发送消息到Broker失败: {e}")
    
    def send_oneway_to_queue(self, msg: Message, mq: MessageQueue) -> None:
        """单向发送消息到指定队列"""
        if not self.lifecycle_manager.is_running():
            raise ProducerException("生产者未启动")
        
        start_time = time.time()
        
        try:
            # 验证消息
            self._validate_message(msg)
            
            # 压缩消息
            msg = self._compress_message(msg)
            
            # 单向发送消息
            self._send_oneway_impl(msg, mq)
            
            # 记录发送统计
            send_time = int((time.time() - start_time) * 1000)
            self.send_statistics.record_send(send_time)
            
        except Exception as e:
            send_time = int((time.time() - start_time) * 1000)
            self.send_statistics.record_send_failure(send_time)
            raise e
    
    def get_send_statistics(self) -> Dict[str, any]:
        """获取发送统计信息"""
        return self.send_statistics.get_statistics()

class OnewayStatistics:
    """单向发送统计"""
    
    def __init__(self):
        self.total_send_count = 0
        self.failure_send_count = 0
        self.total_send_time = 0
        self.max_send_time = 0
        self.min_send_time = float('inf')
        self.lock = threading.Lock()
    
    def record_send(self, send_time: int) -> None:
        """记录发送"""
        with self.lock:
            self.total_send_count += 1
            self.total_send_time += send_time
            self.max_send_time = max(self.max_send_time, send_time)
            self.min_send_time = min(self.min_send_time, send_time)
    
    def record_send_failure(self, send_time: int = 0) -> None:
        """记录发送失败"""
        with self.lock:
            self.total_send_count += 1
            self.failure_send_count += 1
            if send_time > 0:
                self.total_send_time += send_time
    
    def get_statistics(self) -> Dict[str, any]:
        """获取统计信息"""
        with self.lock:
            success_count = self.total_send_count - self.failure_send_count
            avg_send_time = (self.total_send_time / success_count 
                           if success_count > 0 else 0)
            
            success_rate = (success_count / self.total_send_count * 100 
                          if self.total_send_count > 0 else 0)
            
            return {
                "total_send_count": self.total_send_count,
                "success_send_count": success_count,
                "failure_send_count": self.failure_send_count,
                "success_rate": round(success_rate, 2),
                "avg_send_time_ms": round(avg_send_time, 2),
                "max_send_time_ms": self.max_send_time if self.max_send_time != 0 else 0,
                "min_send_time_ms": self.min_send_time if self.min_send_time != float('inf') else 0
            }

# 扩展BrokerClient以支持单向发送
class BrokerClient:
    """Broker客户端(扩展版)"""
    
    def __init__(self, nameserver_addr: str):
        self.nameserver_addr = nameserver_addr
        self.is_shutdown = False
    
    def send_message(self, broker_name: str, message: Message, 
                    queue_id: int, timeout: int) -> Dict[str, any]:
        """发送消息到Broker"""
        if self.is_shutdown:
            raise RemotingException("Broker客户端已关闭")
        
        # 模拟网络延迟
        time.sleep(0.01)
        
        # 模拟发送结果
        import random
        queue_offset = random.randint(1000, 9999)
        
        return {
            "queue_offset": queue_offset,
            "offset_msg_id": f"{broker_name}_{queue_id}_{queue_offset}",
            "region_id": "DefaultRegion"
        }
    
    def send_message_oneway(self, broker_name: str, message: Message, queue_id: int) -> None:
        """单向发送消息到Broker(不等待响应)"""
        if self.is_shutdown:
            raise RemotingException("Broker客户端已关闭")
        
        # 模拟单向发送,不等待响应
        # 实际实现中会发送网络请求但不等待响应
        time.sleep(0.001)  # 极短的发送时间
    
    def shutdown(self) -> None:
        """关闭客户端"""
        self.is_shutdown = True

# 使用示例
if __name__ == "__main__":
    # 创建生产者配置
    config = ProducerConfig(
        producer_group="oneway_producer_group",
        nameserver_addr="192.168.1.100:9876"
    )
    
    # 创建单向生产者
    producer = OnewayProducer(config)
    
    try:
        # 启动生产者
        producer.start()
        
        # 单向发送消息
        for i in range(100):
            message = Message(
                topic="OnewayTestTopic",
                body=f"Oneway message {i}".encode('utf-8'),
                tags="OnewayTag",
                keys=f"OnewayKey_{i}"
            )
            
            # 单向发送(不等待响应)
            producer.send_oneway(message)
            
            if i % 10 == 0:
                print(f"已发送 {i+1} 条单向消息")
        
        print("\n所有单向消息发送完成")
        
        # 获取发送统计
        statistics = producer.get_send_statistics()
        print(f"\n单向发送统计:")
        for key, value in statistics.items():
            print(f"  {key}: {value}")
        
    except Exception as e:
        print(f"单向发送异常: {e}")
    finally:
        # 关闭生产者
        producer.shutdown()