接了个外包做数据迁移,1000万数据2小时搞定
上周接了个外包项目,客户要把MySQL数据迁移到PostgreSQL,1000万条订单数据,核心要求是不能停机。
一开始我想用传统的SQL导出导入方案,但算了一下时间:导出2小时,导入2小时,加上数据校验,至少要3天。而且这期间业务完全不能用,客户肯定不同意。
后来我用了Debezium + Kafka的实时同步方案,2小时就搞定了,而且全程不停机,客户非常满意,直接加了2000块钱。
传统方案的问题
为什么不能直接用SQL导出导入?主要有这几个问题:
1. 速度慢:1000万数据导出需要2小时,导入又要2小时,总共至少3天
2. 必须停机:导出期间不能写入新数据,业务完全中断
3. 内存爆炸:一次性加载大量数据,服务器内存扛不住
4. 数据丢失:导出期间产生的新数据会丢失
5. 回滚困难:迁移失败后很难恢复到原状态
客户说业务不能停,这些问题都不能接受。所以我必须找一个零停机、零丢失的方案。
我的解决方案
经过调研,我选择了Debezium + Kafka的实时同步方案。核心思路是:
1. Debezium 监听 MySQL binlog:捕获所有数据变更(INSERT/UPDATE/DELETE)
2. 变更事件发送到 Kafka:消息队列缓冲数据,削峰填谷
3. 消费者批量写入 PostgreSQL:从 Kafka 读取事件,批量写入目标库
4. 全量数据分批迁移:每批10万条,避免内存溢出
5. 增量数据实时同步:通过 binlog 实时捕获,零数据丢失
整个架构是这样的:
MySQL (binlog) → Debezium → Kafka → Consumer → PostgreSQL核心配置
Debezium 的配置文件(JSON格式):
{ "name": "mysql-connector", "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "localhost", "database.port": "3306", "database.user": "root", "database.password": "password", "database.server.id": "1", "database.server.name": "mysql-server", "database.include.list": "orders", "table.include.list": "orders.order_info", "snapshot.mode": "initial", "database.history.kafka.bootstrap.servers": "localhost:9092", "database.history.kafka.topic": "schema-changes"}关键配置说明:
• snapshot.mode = initial:先做全量同步,再做增量同步
• database.include.list:指定要同步的数据库
• table.include.list:指定要同步的表
MySQL binlog 必须开启,配置文件(my.cnf)加上:
log-bin=mysql-binbinlog_format=ROWserver-id=1消费者程序(Python示例):
from kafka import KafkaConsumerimport psycopg2import jsonconsumer = KafkaConsumer( 'mysql-server.orders.order_info', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')))conn = psycopg2.connect( host="localhost", database="orders", user="postgres", password="password")cursor = conn.cursor()batch = []batch_size = 1000for message in consumer: event = message.value if event['op'] == 'c': # INSERT batch.append(event['after']) if len(batch) >= batch_size: # 批量插入 cursor.executemany( "INSERT INTO order_info VALUES (%s, %s, %s)", batch ) conn.commit() batch = []最终效果

用这套方案,最终效果非常好:
• 迁移时间:2小时(原来要3天)
• 停机时间:0秒(业务完全不受影响)
• 数据丢失:0条(实时同步保证完整性)
• 效率提升:36倍
客户看到效果后非常满意,直接加了2000块钱,还说下次有类似项目还找我。
避坑指南
在实施过程中,我踩了几个坑,分享给大家:
1. MySQL binlog 必须开启:配置文件加上 log-bin=mysql-bin 和 binlog_format=ROW,否则 Debezium 无法工作
2. Kafka 分区数要合理:建议设置为消费者数量的2-3倍,提高并发能力
3. 批量写入控制大小:每批1000-5000条最合适,太大会内存溢出,太小效率低
4. 监控同步延迟:实时监控 Kafka lag,延迟超过10秒要告警,否则数据会越积越多
5. 做好数据校验:迁移完成后对比总行数和关键字段MD5,确保数据一致性
小贴士:
如果数据量特别大(亿级),可以考虑用多个消费者并行处理,进一步提升速度。
总结
通过 Debezium + Kafka 方案,成功实现了千万级数据的零停机迁移,效率提升36倍。这套方案的核心优势是:
• 不停机:业务完全不受影响
• 零丢失:实时同步保证数据完整性
• 可回滚:支持随时回滚到原数据库
• 高效率:分批迁移+批量写入,速度快
如果你也有大规模数据迁移的需求,可以试试这套方案。