sqlserver还原数据库(根据binlog恢复SQL)

sqlserver还原数据库(根据binlog恢复SQL)
根据binlog恢复SQL

SQL MySQL Binlog 回滚工具 - 快速简易无误版

一、工具概述

1.1 设计背景

在生产环境中,数据误操作是不可避免的。常见的误操作场景包括:

  • 误删除 : DELETE 语句删除了不该删除的数据
  • 误更新 : UPDATE 语句修改了不该修改的字段
  • 误插入 :测试数据或错误数据被插入到生产表

MySQL 的 binlog(二进制日志)记录了所有对数据库的修改操作。通过解析 binlog,我们可以: #前端 #MySQL #2025 AI/Vibe Coding 对我的影响

  1. 找到误操作的具体时间和位置
  2. 生成反向的 SQL 语句(回滚 SQL)
  3. 恢复数据到误操作前的状态

1.2 工具定位

本工具是一个 精简版的 MySQL binlog 回滚工具 ,专为解决生产环境无法安装 binlog2sql 等工具且中小规模的数据误操作问题设计。如果是大量数据,基本可以去云厂商提工单进行解决

核心特点

| 特性 | 说明 | | ---

| ✅ 轻量级 | 单个 Python 脚本,无需安装完整项目 | | ✅ 易使用 | 命令行参数简单,输出清晰的 SQL | | ✅ 已验证 | 经过真实环境测试,成功恢复数据 | | ✅ 可扩展 | 代码结构清晰,易于定制和扩展 | | ✅ 兼容性 | 适配 mysql-replication 1.0+ 版本 |

1.3 适用场景

| 场景 | 是否适用 | 说明 | | ---

| 少量数据误删除 | ✅ 推荐 | 快速生成 INSERT 语句恢复 | | 少量数据误更新 | ✅ 推荐 | 生成反向 UPDATE 语句 | | 测试数据误插入 | ✅ 推荐 | 生成 DELETE 语句清理 | | 大规模数据恢复 | ⚠️ 可用 | 建议分批执行 | | 跨表事务恢复 | ❌ 不适用 | 需要手动处理外键约束 |

1.4 不适用场景

  • ❌ binlog 格式为 STATEMENT 或 MIXED(必须是 ROW 格式)
  • ❌ 跨表事务恢复(需要手动处理外键)
  • ❌ binlog 文件已被清理或损坏
  • ❌ 需要回滚 DDL 操作(CREATE/ALTER/DROP)

二、架构设计

2.1 技术架构

2.2 核心模块

2.2.1 BinlogRollback 类

职责 :核心回滚逻辑控制器

class BinlogRollback:    def __init__(connection_settings, save_files=True)    def generate_rollback(log_file, start_time, stop_time, ...)    def _delete_to_insert(schema, table, values)    def _update_to_reverse_update(schema, table, before_values, after_values)    def _insert_to_delete(schema, table, values)    def save_to_file(filename)    def print_sqls(limit=10)

关键设计

  • 单一职责 :专注于 SQL 生成,不涉及业务逻辑
  • 时间过滤 :手动实现时间戳过滤(适配新版 API)
  • 类型安全 :根据数据类型自动转义和格式化

2.2.2 SQL 生成器

| 事件类型 | 转换规则 | 示例 | | ---

| DELETE | 生成 INSERT | DELETE FROM t WHERE id=1 → INSERT INTO t VALUES (1, ...) | | UPDATE | 生成反向 UPDATE | UPDATE t SET name='A' WHERE id=1 → UPDATE t SET name='B' WHERE id=1 | | INSERT | 生成 DELETE | INSERT INTO t VALUES (1, ...) → DELETE FROM t WHERE id=1 |

2.2.3 数据类型处理

| 数据类型 | 处理方式 | 示例 | | ---

| 字符串 | 单引号包裹,转义单引号 | 'O''Reilly' | | 数字 | 直接输出 | 123, 45.67 | | NULL | 输出 NULL | NULL | | 二进制 | 转为 HEX | 0x48656C6C6F | | 时间 | ISO 格式 | '2024-01-15 10:00:00' |

2.3 数据流

Binlog 文件    ↓[解析] BinlogStreamReader    ↓[过滤] 数据库、表、时间范围    ↓[转换] 事件 → SQL 语句    ↓[验证] SQL 语法检查    ↓[输出] SQL 文件

三、核心功能

3.1 功能清单

| 功能 | 说明 | 优先级 | | ---

| 解析 ROW 格式 binlog | 读取并解析 binlog 文件 | P0 | | DELETE → INSERT | 恢复被删除的数据 | P0 | | UPDATE → 反向 UPDATE | 恢复修改前的数据 | P0 | | INSERT → DELETE | 删除误插入的数据 | P0 | | 时间范围过滤 | 按时间范围过滤事件 | P0 | | 位置范围过滤 | 按位置范围过滤事件 | P1 | | 数据库过滤 | 只处理指定数据库 | P1 | | 表过滤 | 只处理指定表 | P1 | | SQL 输出 | 保存到文件或打印到屏幕 | P0 | | 预览功能 | 预览前 N 条 SQL | P1 |

3.2 核心算法

3.2.1 DELETE 转 INSERT

def _delete_to_insert(self, schema, table, values):    """    将 DELETE 事件转换为 INSERT 语句输入: values = {'id': 1, 'name': 'Alice', 'age': 30} 输出: INSERT INTO `db`.`table` (`id`, `name`, `age`) VALUES (1, 'Alice', 30); """# 1. 提取列名columns = [f"`{key}`" for key in values.keys()]# 2. 处理值(类型转换)column_values = [] for key, value in values.items(): if value is None: column_values.append("NULL") elif isinstance(value, str): escaped_value = value.replace("'", "''") column_values.append(f"'{escaped_value}'") elif isinstance(value, bytes): column_values.append(f"0x{value.hex()}") else: column_values.append(str(value))# 3. 组装 SQLsql = f"INSERT INTO `{schema}`.`{table}` ({', '.join(columns)}) VALUES ({', '.join(column_values)});" return sql

3.2.2 时间过滤

# 将时间字符串转换为时间戳(用于过滤)import datetimeif start_time: dt = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') start_timestamp = int(dt.timestamp())if stop_time: dt = datetime.datetime.strptime(stop_time, '%Y-%m-%d %H:%M:%S') stop_timestamp = int(dt.timestamp())# 遍历事件时过滤for binlog_event in stream: if hasattr(binlog_event, 'timestamp') and binlog_event.timestamp: event_time = binlog_event.timestamp# 跳过早于开始时间的事件if start_timestamp and event_time < start_timestamp: continue# 超过结束时间则停止if stop_timestamp and event_time > stop_timestamp: break

四、安装部署

4.1 环境要求

| 组件 | 版本要求 | 说明 | | ---

| Python | 3.6+

| pymysql-replication | 1.0+

| pymysql | 1.0+

| MySQL | 5.7+ | binlog 必须为 ROW 格式 |

4.2 安装步骤

Step 1: 检查 Python 环境

python3 --version# 输出示例: Python 3.12.8

Step 2: 安装依赖

# macOS/Linuxpip3 install pymysql mysql-replication# 验证安装pip3 list | grep -E "pymysql|mysql-replication"

Step 3: 配置 MySQL 权限(数据库账号已经存在的跳过)

--GRANT REPLICATION CLIENT ON *.*GRANT SELECT ON *.* TO 'your_user'@'%';--FLUSH PRIVILEGES;

Step 4: 验证 binlog 格式

--SHOW VARIABLES LIKE 'binlog_format'; -- 输出应为: ROW--SET GLOBAL binlog_format = 'ROW';--[mysqld] binlog_format=ROW--SHOW BINARY LOGS;

Step 5: 获取工具

工具位于(这是我本地位置):/Users/jeffrey/Documents/binlog_rollback.py

# 赋予执行权限chmod +x /Users/jeffrey/Documents/binlog_rollback.py# 测试运行python3 /Users/jeffrey/Documents/binlog_rollback.py --help

4.3 配置文件(可选)

可以创建配置文件 rollback_config.ini :

[mysql]host = localhostport = 3306user = rootpassword = your_password[binlog] log_file = binlog.000001 output_dir = /path/to/output[filter] databases = db1,db2 tables = table1,table2

五、使用指南

5.1 基本语法

python3 /Users/jeffrey/Documents/binlog_rollback.py \  -H  \  -P <端口> \  -u <用户名> \  -p <密码> \  --start-file  \  [其他选项]

5.2 参数说明

必填参数

| 参数 | 说明 | 示例 | | ---

| -u, --user | MySQL 用户名 | root | | -p, --password | MySQL 密码 | your_password | | --start-file | 起始 binlog 文件 | binlog.000001 |

可选参数

| 参数 | 类型 | 默认值 | 说明 | | ---

| -H, --host | string | localhost | MySQL 主机地址 | | -P, --port | int | 3306 | MySQL 端口 | | --start-datetime | string | None | 起始时间(格式:YYYY-MM-DD HH:MM:SS) | | --stop-datetime | string | None | 结束时间 | | --start-pos | int | None | 起始位置 | | --stop-pos | int | None | 结束位置 | | -d, --databases | string | None | 数据库过滤(逗号分隔) | | -t, --tables | string | None | 表过滤(逗号分隔) | | -o, --output | string | rollback.sql | 输出文件名 | | --preview | int | 10 | 预览 SQL 数量 |

实例案例:

python3 /Users/jeffrey/Documents/binlog_rollback.py  -H localhost  -P 3306  -u root  -p '123'  --start-file binlog.000031  --start-datetime "2026-01-15 19:12:00"  --stop-datetime "2026-01-15 19:19:59"  -d testDB  -t testTable  -o users_rollback.sql

5.3 使用场景

场景 1:恢复 DELETE 操作(最常用)

情况 :在 2026-01-15 19:12:00 误删除了数据

python3 /Users/jeffrey/Documents/binlog_rollback.py \  -H localhost \  -P 3306 \  -u root \  -p 'your_password' \  --start-file binlog.000031 \  --start-datetime "2026-01-15 19:12:00" \  --stop-datetime "2026-01-15 19:20:00" \  -d testDB \  -t tags \  -o /Users/jeffrey/Documents/recovery.sql

场景 2:恢复 UPDATE 操作

# 恢复被修改的数据python3 /Users/jeffrey/Documents/binlog_rollback.py \ -H localhost \ -u root \ -p 'password' \ --start-file binlog.000050 \ --start-datetime "2026-01-15 14:00:00" \ --stop-datetime "2026-01-15 14:05:00" \ -d your_db \ -t users \ -o update_recovery.sql

场景 3:删除误插入的测试数据

# 将 INSERT 转换为 DELETEpython3 /Users/jeffrey/Documents/binlog_rollback.py \ -H localhost \ -u root \ -p 'password' \ --start-file binlog.000100 \ --start-datetime "2026-01-15 16:00:00" \ --stop-datetime "2026-01-15 16:30:00" \ -d your_db \ -t test_logs \ -o delete_test_data.sql

场景 4:按位置恢复(时间不准确时)

# 使用 binlog 位置而非时间python3 /Users/jeffrey/Documents/binlog_rollback.py \ -H localhost \ -u root \ -p 'password' \ --start-file binlog.000031 \ --start-pos 456789 \ --stop-pos 457000 \ -d your_db \ -t your_table \ -o recovery.sql

5.4 执行恢复

Step 1: 检查生成的 SQL

# 预览前 20 行head -n 20 /Users/jeffrey/Documents/recovery.sql# 统计 SQL 数量grep -c "INSERT|UPDATE|DELETE" /Users/jeffrey/Documents/recovery.sql

Step 2: 备份当前数据(必须!)

mysqldump -u root -p'password' your_db your_table \  > /Users/jeffrey/Documents/backup_$(date +%Y%m%d_%H%M%S).sql

Step 3: 修复问题(如有)

详见「七、问题诊断」章节

Step 4: 执行恢复

# 方法 1:直接执行mysql -u root -p'password' your_db < /Users/jeffrey/Documents/recovery.sql# 方法 2:忽略主键冲突(推荐)sed -i.bak 's/INSERT INTO/INSERT IGNORE INTO/g' recovery.sql mysql -u root -p'password' your_db < recovery.sql

Step 5: 验证结果

--SELECT COUNT(*) FROM your_table WHERE create_time < '2026-01-15 19:12:00';--SELECT * FROM your_table WHERE id IN (1, 2, 3);

六、实战案例

6.1 案例背景

| 项目 | 信息 | | ---

| 误操作时间 | 2026-01-15 19:12:00 ~ 19:19:59 | | 数据库 | testDB | | 表 | tags | | binlog 文件 | binlog.000031 | | 误操作类型 | DELETE +

| 影响范围 | 删除 id=2,3;插入 id=2004~2013 |

6.2 完整操作流程

Step 1: 确认 binlog 文件

SHOW BINARY LOGS;-- 输出: binlog.000031

Step 2: 生成回滚 SQL

python3 /Users/jeffrey/Documents/binlog_rollback.py \  -H localhost \  -P 3306 \  -u root \  -p '123123' \  --start-file binlog.000031 \  --start-datetime "2026-01-15 19:12:00" \  --stop-datetime "2026-01-15 19:19:59" \  -d testDB \  -t tags \  -o /Users/jeffrey/Documents/users_rollback.sql

输出

✓ 共生成 12 条回滚语句  - 2 条 INSERT(恢复 id=2,3)  - 10 条 DELETE(删除 id=2004~2013)

Step 3: 检查生成的 SQL

head -n 20 /Users/jeffrey/Documents/users_rollback.sql

发现的问题

  1. ❌ 列名显示为 UNKNOWN_COL0 , UNKNOWN_COL1 等
  2. ❌ INSERT 语句中 type 和 status 为 NULL(违反 NOT NULL 约束)
  3. ❌ DELETE 语句 WHERE 条件与实际数据不匹配

Step 4: 获取表结构

mysql -u root -p'123123' testDB -e "DESC tags;"

表结构

+--------------+------------------+------+-----+---------+----------------+| Field        | Type             | Null | Key | Default | Extra          |+--------------+------------------+------+-----+---------+----------------+| name | varchar(128) | NO | MUL | NULL | | | type | enum(...) | NO | MUL | NORMAL | | | status | enum(...) | NO | | ACTIVE | | | tag_desc | varchar(256) | YES | | NULL | | | weight | tinyint(1) | NO | | 1 | | | color | varchar(64) | YES | | NULL | | | deleted | tinyint(1) | NO | | 0 | | | creator | varchar(64) | YES | | NULL | | | modifier | varchar(64) | YES | | NULL | | | create_time | datetime | YES | | NULL | | | modified_time| datetime | YES | | NULL | | +--------------+------------------+------+-----+---------+----------------+

Step 5: 修复 SQL 文件

5.1 替换列名占位符

sed -i.bak \  -e 's/`UNKNOWN_COL0`/`id`/g' \  -e 's/`UNKNOWN_COL1`/`name`/g' \  -e 's/`UNKNOWN_COL2`/`type`/g' \  -e 's/`UNKNOWN_COL3`/`status`/g' \  -e 's/`UNKNOWN_COL4`/`tag_desc`/g' \  -e 's/`UNKNOWN_COL5`/`weight`/g' \  -e 's/`UNKNOWN_COL6`/`color`/g' \  -e 's/`UNKNOWN_COL7`/`deleted`/g' \  -e 's/`UNKNOWN_COL8`/`creator`/g' \  -e 's/`UNKNOWN_COL9`/`modifier`/g' \  -e 's/`UNKNOWN_COL10`/`create_time`/g' \  -e 's/`UNKNOWN_COL11`/`modified_time`/g' \  /Users/jeffrey/Documents/users_rollback.sql

5.2 创建修复版 SQL

cat > /Users/jeffrey/Documents/users_rollback_fixed.sql << 'EOF'SET FOREIGN_KEY_CHECKS=0;--INSERT INTO `testDB`.`tags` (`id`, `name`, `type`, `status`, `tag_desc`, `weight`, `color`, `deleted`, `creator`, `modifier`, `create_time`, `modified_time`) VALUES (3, '2026年度集约运维', 'NORMAL', 'ACTIVE', NULL, 1, NULL, 0, NULL, NULL, '2026-01-15 18:49:45', '2026-01-15 18:49:45');--INSERT INTO `testDB`.`tags` (`id`, `name`, `type`, `status`, `tag_desc`, `weight`, `color`, `deleted`, `creator`, `modifier`, `create_time`, `modified_time`) VALUES (2, '2024年度集约运维', 'NORMAL', 'ACTIVE', NULL, 1, NULL, 0, NULL, NULL, '2026-01-15 18:49:16', '2026-01-15 18:49:16');--DELETE FROM `testDB`.`tags` WHERE `id` = 2004; DELETE FROM `testDB`.`tags` WHERE `id` = 2005; DELETE FROM `testDB`.`tags` WHERE `id` = 2006; DELETE FROM `testDB`.`tags` WHERE `id` = 2007; DELETE FROM `testDB`.`tags` WHERE `id` = 2008; DELETE FROM `testDB`.`tags` WHERE `id` = 2009; DELETE FROM `testDB`.`tags` WHERE `id` = 2010; DELETE FROM `testDB`.`tags` WHERE `id` = 2011; DELETE FROM `testDB`.`tags` WHERE `id` = 2012; DELETE FROM `testDB`.`tags` WHERE `id` = 2013;SET FOREIGN_KEY_CHECKS=1; EOF

Step 6: 备份当前数据

mysqldump -u root -p'123123' testDB tags \  > /Users/jeffrey/Documents/tags_backup_$(date +%Y%m%d_%H%M%S).sql

Step 7: 执行恢复

mysql -u root -p'123123' testDB \  < /Users/jeffrey/Documents/users_rollback_fixed.sql# 输出: ✓ SQL 执行完成

Step 8: 验证结果

--SELECT id, name, type, status, deleted, create_time FROM tags WHERE id IN (2, 3) ORDER BY id;-------- 3 | 测试待删除标签3 | NORMAL | ACTIVE | 0 | 2026-01-15 18:49:45--SELECT COUNT(*) FROM tags WHERE id BETWEEN 2004 AND 2013; -- 结果: 0--SELECT COUNT(*) FROM tags; -- 结果: 3

6.3 恢复结果

| 项目 | 结果 | | ---

| 恢复记录 | ✅ id=2, id=3 已成功恢复 | | 删除记录 | ✅ id=2004~2013 已成功删除 | | 数据完整性 | ✅ 验证通过 | | 备份文件 | ✅ tags_backup_20260115_201731.sql |


七、问题诊断

7.1 常见问题

问题 1:列名显示为 UNKNOWN_COL0 , UNKNOWN_COL1 等

现象

INSERT INTO `db`.`table` (`UNKNOWN_COL0`, `UNKNOWN_COL1`, ...) VALUES (1, 'test', ...);

原因

  • MySQL binlog 配置未包含完整元数据信息
  • pymysql-replication 库无法获取列名

解决方法

  1. 获取表结构
mysql -u root -p -e "DESC your_database.your_table;"
  1. 批量替换列名
sed -i.bak \  -e 's/`UNKNOWN_COL0`/`id`/g' \  -e 's/`UNKNOWN_COL1`/`name`/g' \  -e 's/`UNKNOWN_COL2`/`email`/g' \  rollback.sql

问题 2:INSERT 语句中 NOT NULL 字段为 NULL

现象

INSERT INTO `db`.`table` (`id`, `name`, `type`, `status`) VALUES (1, 'test', NULL, NULL);-- 错误: Column 'type' cannot be null

原因

  • binlog 记录时字段为默认值
  • 但表中设置了 NOT NULL 约束

解决方法

手动添加默认值

--INSERT INTO `db`.`table` (`id`, `name`, `type`, `status`) VALUES (1, 'test', NULL, NULL);--INSERT INTO `db`.`table` (`id`, `name`, `type`, `status`) VALUES (1, 'test', 'NORMAL', 'ACTIVE');

问题 3:DELETE 语句 WHERE 条件不匹配

现象

--DELETE FROM `db`.`table` WHERE `id` = 100 AND `name` = 'test' AND `type` IS NULL -- ⚠️ 实际是 'OPS' AND `status` IS NULL; -- ⚠️ 实际是 'ACTIVE'-- 执行后无记录被删除

原因

  • binlog 记录的值与当前数据库不一致
  • 在误操作之后,记录又被 UPDATE 过

解决方法

简化 WHERE 条件,只使用主键

--DELETE FROM `db`.`table` WHERE `id` = 100 AND `name` = 'test' AND ...;--DELETE FROM `db`.`table` WHERE `id` = 100;

问题 4:主键冲突

现象

INSERT INTO `db`.`table` (`id`, `name`) VALUES (1, 'test');-- 错误: Duplicate entry '1' for key 'PRIMARY'

原因

  • 要恢复的记录 ID 已存在

解决方法

方法 1:使用 INSERT IGNORE

sed -i.bak 's/INSERT INTO/INSERT IGNORE INTO/g' rollback.sql

方法 2:使用 ON DUPLICATE KEY UPDATE

INSERT INTO `db`.`table` (`id`, `name`) VALUES (1, 'test')ON DUPLICATE KEY UPDATE  name = VALUES(name),  deleted = 0;

方法 3:只恢复不存在的记录

INSERT INTO `db`.`table` (id, name)SELECT 1, 'test'FROM DUALWHERE NOT EXISTS (SELECT 1 FROM `db`.`table` WHERE id = 1);

问题 5:缺少 REPLICATION CLIENT 权限

现象

sqlserver还原数据库(根据binlog恢复SQL)

❌ 错误: Access denied; you need (at least one of) the REPLICATION CLIENT privilege(s) for this operation

解决方法

GRANT REPLICATION CLIENT ON *.*FLUSH PRIVILEGES;

问题 6:binlog 文件不存在

现象

❌ 错误: Binlog file not found: binlog.000999

解决方法

  1. 查看可用的 binlog 文件
SHOW BINARY LOGS;
  1. 确认文件名正确
ls -lh /var/lib/mysql/binlog.*
  1. 使用正确的文件名
--start-file binlog.000001  # 使用实际存在的文件

7.2 诊断清单

| 检查项 | 命令 | 预期结果 | | | ---

| Python 版本 | python3 --version | 3.6+

| 依赖安装 | pip3 list | grep -E "pymysql | mysql-replication" | | binlog 格式 | SHOW VARIABLES LIKE 'binlog_format'; | ROW | | | 复制权限 | SHOW GRANTS FOR 'user'@'host'; | 有 REPLICATION CLIENT | | | binlog 文件 | SHOW BINARY LOGS; | 文件存在 | | | 表结构 | DESC table; | 结构正确 | |


八、最佳实践

8.1 恢复流程(推荐)

1. 确认误操作   ↓2. 查找 binlog 文件和时间范围   ↓3. 生成回滚 SQL   ↓4. 检查生成的 SQL   ↓5. 修复问题(列名、默认值、WHERE 条件)   ↓6. 备份当前数据(必须!)   ↓7. 执行恢复   ↓8. 验证结果

8.2 安全建议

| 建议 | 说明 | | ---

| ✅ 备份优先 | 执行前务必备份当前数据 | | ✅ 测试环境验证 | 先在测试环境验证 SQL | | ✅ 分批执行 | 大量数据分批恢复 | | ✅ 停机维护 | 关键业务在维护窗口执行 | | ✅ 双人复核 | SQL 由第二人复核 | | ✅ 日志记录 | 记录所有恢复操作 |

8.3 预防措施

8.3.1 MySQL 配置

--SET GLOBAL log_bin = ON;--SET GLOBAL binlog_format = 'ROW';--SET GLOBAL expire_logs_days = 7;----CHANGE MASTER TO MASTER_DELAY = 3600;

8.3.2 应用层防护

--SET SQL_SAFE_UPDATES = 1;--CREATE TABLE backup_table_20260115 AS SELECT * FROM target_table;--BEGIN; DELETE FROM target_table WHERE id = 100; ---- ROLLBACK; 或 COMMIT;

8.3.3 运维规范

| 规范 | 说明 | | ---

| 定期备份 | 每日全量备份 +

| 备份验证 | 每周测试恢复流程 | | 权限管理 | 最小权限原则 | | 操作审计 | 记录所有 DML 操作 | | 演练 | 每月进行恢复演练 |

8.4 性能优化

大批量数据恢复

# 分批执行(每 1000 条一批)split -l 1000 rollback.sql /tmp/batch_for file in /tmp/batch_*; do mysql -u root -p database < "$file" echo "Restored: $file" sleep 1 # 避免负载过高done

禁用索引(加速)

--ALTER TABLE table DISABLE KEYS; --ALTER TABLE table ENABLE KEYS;--SET unique_checks=0; SET foreign_key_checks=0; --SET unique_checks=1; SET foreign_key_checks=1;

附录A:恢复方案对比

A.1 方案总览

| 方案 | 难度 | 速度 | 安全性 | 适用场景 | 推荐指数 | | ---

| 精简版工具 | ⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 中小规模恢复 | ⭐⭐⭐⭐⭐ | | mysqlbinlog 手工 | ⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | 少量数据 | ⭐⭐⭐ | | 原版 binlog2sql | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 中大规模 | ⭐⭐⭐⭐ | | 临时实例恢复 | ⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | 大规模/复杂场景 | ⭐⭐⭐ | | 全量备份恢复 | ⭐⭐⭐⭐ | ⭐ | ⭐⭐⭐⭐⭐ | 灾难恢复 | ⭐⭐⭐⭐ |

A.2 详细对比

方案 1:精简版工具(本文档)

优势

  • ✅ 单个脚本,易于部署
  • ✅ 命令简单,上手快
  • ✅ 已验证可用
  • ✅ 支持时间/位置过滤
  • ✅ 自动生成回滚 SQL

劣势

  • ⚠️ 需要手动修复列名和默认值
  • ⚠️ 不支持复杂事务
  • ⚠️ 依赖 Python 环境

适用场景

  • 中小规模数据误操作(< 10,000 条)
  • 快速恢复需求
  • 有 Python 环境

方案 2:mysqlbinlog 手工解析

优势

  • ✅ MySQL 官方工具
  • ✅ 无需额外依赖
  • ✅ 灵活性高

劣势

  • ❌ 需要手工转换 SQL
  • ❌ 耗时较长
  • ❌ 容易出错

适用场景

  • 极少量数据(< 100 条)
  • 无 Python 环境
  • 紧急情况

基本命令

# 导出 binlogmysqlbinlog --base64-output=DECODE-ROWS -v \ --start-datetime="2026-01-15 10:00:00" \ /var/lib/mysql/binlog.000001 > binlog_output.txt# 手工转换为 SQL

方案 3:原版 binlog2sql

优势

  • ✅ 功能完整
  • ✅ 社区支持
  • ✅ 持续更新
  • ✅ 支持更多特性

劣势

  • ⚠️ 需要安装完整项目
  • ⚠️ 配置较复杂
  • ⚠️ GitHub 访问可能受限

安装

pip install mysql-replication pymysqlgit clone https://github.com/danfengcao/binlog2sql.gitcd binlog2sqlpip install -r requirements.txt

使用

python binlog2sql.py \  -h localhost -u root -p 'password' \  --start-file binlog.000001 \  --start-datetime "2026-01-15 10:00:00" \  -B -d database -t table > rollback.sql

方案 4:临时实例恢复

优势

  • ✅ 最安全,不影响生产
  • ✅ 可以精确控制恢复点
  • ✅ 支持复杂场景

劣势

  • ❌ 需要额外资源
  • ❌ 耗时最长
  • ❌ 操作复杂

流程

# 1. 准备临时环境# 2. 恢复全量备份mysql < full_backup.sql# 3. 应用 binlogmysqlbinlog --start-position=154 \ /var/lib/mysql/binlog.000001 | mysql# 4. 导出数据mysqldump database table > restore_data.sql# 5. 导入到生产mysql -u root -p database < restore_data.sql

方案 5:全量备份恢复

优势

  • ✅ 最简单直接
  • ✅ 适合灾难恢复

劣势

  • ❌ 会丢失误操作后的新数据
  • ❌ 耗时长
  • ❌ 需要停机

流程

# 1. 停止应用systemctl stop app# 2. 恢复备份mysql < full_backup.sql# 3. 应用 binlog(到误操作前一刻)mysqlbinlog --stop-datetime="2026-01-15 10:00:00" \ /var/lib/mysql/binlog.000001 | mysql# 4. 启动应用systemctl start app

附录B:完整工具脚本

B.1 binlog_rollback.py

位置 :/Users/jeffrey/Documents/binlog_rollback.py

#!/usr/bin/env python3# -*- coding: utf-8 -*-""" MySQL Binlog 回滚工具 -基于 pymysql-replication 库实现功能:- DELETE → INSERT(恢复被删除的数据) - UPDATE → 反向 UPDATE(恢复修改前的数据) - INSERT → DELETE(删除误插入的数据)作者:Jeffrey 版本:v1.0 最后更新:2026-01-15 """import sys import argparse from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent, )# MySQL 连接配置示例MYSQL_SETTINGS = { 'host': 'localhost', 'port': 3306, 'user': 'root', 'passwd': '', }class BinlogRollback: """Binlog 回滚 SQL 生成器"""def __init__(self, connection_settings, save_files=True): """ 初始化回滚工具Args: connection_settings: MySQL 连接配置 save_files: 是否保存到文件 """ self.connection_settings = connection_settings self.save_files = save_files self.rollback_sqls = [] self.start_time_filter = None self.stop_time_filter = Nonedef generate_rollback(self, log_file, start_time=None, stop_time=None, start_pos=None, stop_pos=None, databases=None, tables=None): """ 生成回滚 SQLArgs: log_file: binlog 文件名 start_time: 开始时间 stop_time: 结束时间 start_pos: 开始位置 stop_pos: 结束位置 databases: 数据库过滤 tables: 表过滤 """ # 构建 BinLogStreamReader 参数(适配 mysql-replication 1.0+)stream_kwargs = { 'connection_settings': self.connection_settings, 'server_id': 100, 'blocking': False, 'log_file': log_file, 'only_events': [DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent], 'resume_stream': False, }# 添加位置参数(如果提供)if start_pos: stream_kwargs['log_pos'] = start_pos if stop_pos: stream_kwargs['end_pos'] = stop_pos# 注意:新版本 mysql-replication 已移除 start_time/end_time 参数# 时间过滤需要在事件读取时手动实现self.start_time_filter = start_time self.stop_time_filter = stop_timestream = BinLogStreamReader(**stream_kwargs)# 将时间字符串转换为时间戳(用于过滤)import datetime start_timestamp = None stop_timestamp = None if start_time: dt = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') start_timestamp = int(dt.timestamp()) if stop_time: dt = datetime.datetime.strptime(stop_time, '%Y-%m-%d %H:%M:%S') stop_timestamp = int(dt.timestamp())for binlog_event in stream: # 时间过滤(基于事件时间戳)if hasattr(binlog_event, 'timestamp') and binlog_event.timestamp: event_time = binlog_event.timestamp if start_timestamp is not None and event_time < start_timestamp: continue if stop_timestamp is not None and event_time > stop_timestamp: breakfor row in binlog_event.rows: event_type = type(binlog_event)# 过滤数据库和表if databases and binlog_event.schema not in databases: continue if tables and binlog_event.table not in tables: continue# 生成回滚 SQLif event_type == DeleteRowsEvent: # DELETE → INSERTsql = self._delete_to_insert( binlog_event.schema, binlog_event.table, row["values"] ) self.rollback_sqls.append(sql)elif event_type == UpdateRowsEvent: # UPDATE → UPDATE(反向)sql = self._update_to_reverse_update( binlog_event.schema, binlog_event.table, row["before_values"], row["after_values"] ) self.rollback_sqls.append(sql)elif event_type == WriteRowsEvent: # INSERT → DELETEsql = self._insert_to_delete( binlog_event.schema, binlog_event.table, row["values"] ) self.rollback_sqls.append(sql)stream.close() return self.rollback_sqlsdef _delete_to_insert(self, schema, table, values): """ DELETE 转换为 INSERTArgs: schema: 数据库名 table: 表名 values: 被删除的行数据Returns: INSERT SQL 语句 """ columns = [] column_values = []for key, value in values.items(): columns.append(f"`{key}`") if value is None: column_values.append("NULL") elif isinstance(value, str): # 转义单引号escaped_value = value.replace("'", "''").replace("\", "\\") column_values.append(f"'{escaped_value}'") elif isinstance(value, bytes): # 二进制数据转 hexcolumn_values.append(f"0x{value.hex()}") else: column_values.append(str(value))columns_str = ", ".join(columns) values_str = ", ".join(column_values)sql = f"INSERT INTO `{schema}`.`{table}` ({columns_str}) VALUES ({values_str});" return sqldef _update_to_reverse_update(self, schema, table, before_values, after_values): """ UPDATE 转换为反向 UPDATEArgs: schema: 数据库名 table: 表名 before_values: 更新前的值 after_values: 更新后的值Returns: UPDATE SQL 语句(反向) """ set_clause = [] where_clause = []# SET 子句使用更新前的值for key, value in before_values.items(): if value is None: set_clause.append(f"`{key}` = NULL") elif isinstance(value, str): escaped_value = value.replace("'", "''").replace("\", "\\") set_clause.append(f"`{key}` = '{escaped_value}'") elif isinstance(value, bytes): set_clause.append(f"`{key}` = 0x{value.hex()}") else: set_clause.append(f"`{key}` = {value}")# WHERE 子句使用更新后的值for key, value in after_values.items(): if value is None: where_clause.append(f"`{key}` IS NULL") elif isinstance(value, str): escaped_value = value.replace("'", "''").replace("\", "\\") where_clause.append(f"`{key}` = '{escaped_value}'") elif isinstance(value, bytes): where_clause.append(f"`{key}` = 0x{value.hex()}") else: where_clause.append(f"`{key}` = {value}")set_str = ", ".join(set_clause) where_str = " AND ".join(where_clause)sql = f"UPDATE `{schema}`.`{table}` SET {set_str} WHERE {where_str};" return sqldef _insert_to_delete(self, schema, table, values): """ INSERT 转换为 DELETEArgs: schema: 数据库名 table: 表名 values: 被插入的行数据Returns: DELETE SQL 语句 """ where_clause = []for key, value in values.items(): if value is None: where_clause.append(f"`{key}` IS NULL") elif isinstance(value, str): escaped_value = value.replace("'", "''").replace("\", "\\") where_clause.append(f"`{key}` = '{escaped_value}'") elif isinstance(value, bytes): where_clause.append(f"`{key}` = 0x{value.hex()}") else: where_clause.append(f"`{key}` = {value}")where_str = " AND ".join(where_clause)sql = f"DELETE FROM `{schema}`.`{table}` WHERE {where_str};" return sqldef save_to_file(self, filename): """ 保存回滚 SQL 到文件Args: filename: 输出文件名 """ with open(filename, 'w', encoding='utf-8') as f: f.write("-- MySQL Binlog 回滚 SQL\n") f.write("-- 生成时间: " + str(sys.argv) + "\n") f.write("-- 警告: 执行前请务必检查 SQL 语句!\n\n") f.write("SET FOREIGN_KEY_CHECKS=0;\n\n")for sql in self.rollback_sqls: f.write(sql + "\n")f.write("\nSET FOREIGN_KEY_CHECKS=1;\n") f.write(f"-- 共生成 {len(self.rollback_sqls)} 条回滚语句\n")print(f"✓ 回滚 SQL 已保存到: {filename}") print(f"✓ 共生成 {len(self.rollback_sqls)} 条回滚语句")def print_sqls(self, limit=10): """ 打印前 N 条 SQLArgs: limit: 显示数量 """ print("\n=== 预览前 {} 条回滚 SQL ===\n".format(limit)) for i, sql in enumerate(self.rollback_sqls[:limit], 1): print(f"{i}. {sql}")if len(self.rollback_sqls) > limit: print(f"\n... 还有 {len(self.rollback_sqls) - limit} 条语句")def main(): """主函数""" parser = argparse.ArgumentParser( description='MySQL Binlog 回滚工具 - 生成 DELETE/UPDATE/INSERT 的回滚 SQL' )# MySQL 连接参数parser.add_argument('-H', '--host', dest='host', default='localhost', help='MySQL 主机地址 (默认: localhost)') parser.add_argument('-P', '--port', dest='port', type=int, default=3306, help='MySQL 端口 (默认: 3306)') parser.add_argument('-u', '--user', dest='user', required=True, help='MySQL 用户名') parser.add_argument('-p', '--password', dest='password', required=True, help='MySQL 密码')# Binlog 参数parser.add_argument('--start-file', dest='start_file', required=True, help='起始 binlog 文件 (如: mysql-bin.000001)') parser.add_argument('--start-datetime', dest='start_datetime', help='起始时间 (如: 2024-01-15 10:00:00)') parser.add_argument('--stop-datetime', dest='stop_datetime', help='结束时间 (如: 2024-01-15 10:05:00)') parser.add_argument('--start-pos', dest='start_pos', type=int, help='起始位置') parser.add_argument('--stop-pos', dest='stop_pos', type=int, help='结束位置')# 过滤参数parser.add_argument('-d', '--databases', dest='databases', help='数据库列表 (逗号分隔)') parser.add_argument('-t', '--tables', dest='tables', help='表名列表 (逗号分隔)')# 输出参数parser.add_argument('-o', '--output', dest='output', default='rollback.sql', help='输出文件名 (默认: rollback.sql)') parser.add_argument('--preview', dest='preview', type=int, default=10, help='预览 SQL 数量 (默认: 10)')args = parser.parse_args()# 构建 MySQL 连接配置settings = { 'host': args.host, 'port': args.port, 'user': args.user, 'passwd': args.password, }# 处理过滤参数databases = args.databases.split(',') if args.databases else None tables = args.tables.split(',') if args.tables else Noneprint("=" * 60) print("MySQL Binlog 回滚工具") print("=" * 60) print(f"连接信息: {args.user}@{args.host}:{args.port}") print(f"Binlog 文件: {args.start_file}") if args.start_datetime: print(f"时间范围: {args.start_datetime} ~ {args.stop_datetime}") if databases: print(f"数据库: {databases}") if tables: print(f"表: {tables}") print("=" * 60)# 创建回滚工具实例rollback = BinlogRollback(connection_settings=settings)try: # 生成回滚 SQLprint("\n 正在解析 binlog...") rollback.generate_rollback( log_file=args.start_file, start_time=args.start_datetime, stop_time=args.stop_datetime, start_pos=args.start_pos, stop_pos=args.stop_pos, databases=databases, tables=tables, )if not rollback.rollback_sqls: print("\n⚠️ 未找到需要回滚的操作!") print("请检查:") print(" 1. binlog 文件是否正确") print(" 2. 时间范围是否包含误操作时间") print(" 3. 数据库和表名是否正确") return# 预览 SQLrollback.print_sqls(limit=args.preview)# 保存到文件print(f"\n 正在保存到文件: {args.output}") rollback.save_to_file(args.output)print("\n" + "=" * 60) print("✓ 回滚 SQL 生成完成!") print("=" * 60) print("\n 下一步操作:") print(f"1. 检查生成的 SQL 文件: {args.output}") print("2. 确认无误后,执行以下命令恢复数据:") print(f" mysql -u{args.user} -p {databases[0] if databases else 'your_database'} < {args.output}") print("\n⚠️ 注意: 执行前请务必备份当前数据!")except Exception as e: print(f"\n❌ 错误: {str(e)}") print("\n 可能的原因:") print(" 1. MySQL 连接失败(检查用户名、密码、权限)") print(" 2. binlog 文件不存在或路径错误") print(" 3. 缺少 REPLICATION CLIENT 权限") print("\n 解决方法:") print(" GRANT REPLICATION CLIENT ON *.* TO 'your_user'@'%';") sys.exit(1)if __name__ == '__main__': main()

B.2 一键恢复脚本

位置 :/Users/jeffrey/Documents/auto_recovery.sh

#!/bin/bash# MySQL Binlog 自动恢复脚本## 功能:自动执行 binlog 恢复流程# 使用:bash auto_recovery.shset -e # 遇到错误立即退出# ==================== 配置区 ====================# MySQL 连接信息DB_HOST="localhost" DB_PORT="3306" DB_USER="root" DB_PASS="your_password"# Binlog 信息BINLOG_FILE="binlog.000031" START_TIME="2026-01-15 19:12:00" STOP_TIME="2026-01-15 19:20:00"# 过滤条件DATABASE="testDB" TABLE="tags"# 输出目录OUTPUT_DIR="/Users/jeffrey/Documents" TOOL_PATH="/Users/jeffrey/Documents/binlog_rollback.py"# ==================== 执行区 ====================echo "==================================================" echo "MySQL Binlog 自动恢复脚本" echo "==================================================" echo "数据库: $DATABASE" echo "表: $TABLE" echo "时间范围: $START_TIME ~ $STOP_TIME" echo "=================================================="# Step 1: 生成回滚 SQLecho "" echo "[1/5] 生成回滚 SQL..." OUTPUT_FILE="$OUTPUT_DIR/recovery_$(date +%Y%m%d_%H%M%S).sql"python3 "$TOOL_PATH" \ -H "$DB_HOST" \ -P "$DB_PORT" \ -u "$DB_USER" \ -p "$DB_PASS" \ --start-file "$BINLOG_FILE" \ --start-datetime "$START_TIME" \ --stop-datetime "$STOP_TIME" \ -d "$DATABASE" \ -t "$TABLE" \ -o "$OUTPUT_FILE"if [ ! -f "$OUTPUT_FILE" ]; then echo "❌ SQL 文件生成失败!" exit 1 fi# Step 2: 预览 SQLecho "" echo "[2/5] 预览生成的 SQL..." head -n 20 "$OUTPUT_FILE"# Step 3: 备份当前数据echo "" echo "[3/5] 备份当前数据..." BACKUP_FILE="$OUTPUT_DIR/backup_${TABLE}_$(date +%Y%m%d_%H%M%S).sql" mysqldump -h"$DB_HOST" -P"$DB_PORT" -u"$DB_USER" -p"$DB_PASS" \ "$DATABASE" "$TABLE" > "$BACKUP_FILE" echo "✓ 备份完成: $BACKUP_FILE"# Step 4: 确认执行echo "" echo "[4/5] 确认信息" echo "SQL 文件: $OUTPUT_FILE" echo "备份文件: $BACKUP_FILE" echo "" read -p "确认执行恢复?(yes/no): " confirmif [ "$confirm" != "yes" ]; then echo "取消恢复" exit 0 fi# Step 5: 执行恢复echo "" echo "[5/5] 执行恢复..." mysql -h"$DB_HOST" -P"$DB_PORT" -u"$DB_USER" -p"$DB_PASS" \ "$DATABASE" < "$OUTPUT_FILE"echo "✓ 恢复完成!"# 验证echo "" echo "验证结果:" mysql -h"$DB_HOST" -P"$DB_PORT" -u"$DB_USER" -p"$DB_PASS" \ "$DATABASE" -e "SELECT COUNT(*) AS total FROM $TABLE;"echo "" echo "==================================================" echo "恢复完成!" echo "==================================================" echo "SQL 文件: $OUTPUT_FILE" echo "备份文件: $BACKUP_FILE" echo "=================================================="

B.3 快速修复脚本

位置 :/Users/jeffrey/Documents/fix_sql.sh

#!/bin/bash# SQL 修复脚本# 功能:自动修复列名占位符和默认值问题SQL_FILE="$1"if [ -z "$SQL_FILE" ]; then echo "用法: $0 " exit 1 fiecho "修复 SQL 文件: $SQL_FILE"# 备份原文件cp "$SQL_FILE" "$SQL_FILE.backup"# 替换列名占位符(根据实际情况修改)sed -i.bak \ -e 's/`UNKNOWN_COL0`/`id`/g' \ -e 's/`UNKNOWN_COL1`/`name`/g' \ -e 's/`UNKNOWN_COL2`/`type`/g' \ -e 's/`UNKNOWN_COL3`/`status`/g' \ -e 's/`UNKNOWN_COL4`/`tag_desc`/g' \ -e 's/`UNKNOWN_COL5`/`weight`/g' \ -e 's/`UNKNOWN_COL6`/`color`/g' \ -e 's/`UNKNOWN_COL7`/`deleted`/g' \ -e 's/`UNKNOWN_COL8`/`creator`/g' \ -e 's/`UNKNOWN_COL9`/`modifier`/g' \ -e 's/`UNKNOWN_COL10`/`create_time`/g' \ -e 's/`UNKNOWN_COL11`/`modified_time`/g' \ "$SQL_FILE"echo "✓ 修复完成!" echo "原始文件: $SQL_FILE.backup" echo "修复后文件: $SQL_FILE"

文章版权声明:除非注明,否则均为边学边练网络文章,版权归原作者所有