异步数据库操作!用 SQLAlchemy 2.0 + async 写高效 CRUD
如果你还在用同步 ORM(如 Flask-SQLAlchemy),你的 API 在高并发下会阻塞等待 I/O,吞吐量骤降。
今天,我们用 SQLAlchemy 2.0 + async/await,写出非阻塞、高并发的数据库操作!
今日目标
实现一个异步用户注册接口,将数据安全存入 PostgreSQL(或 SQLite)
要求:
- 使用 SQLAlchemy 2.0 的新式写法
- 全程 async/await
- 自动处理连接池与事务
✅ 步骤 1:安装依赖
pip install sqlalchemy asyncpg # PostgreSQL# 或pip install sqlalchemy aiosqlite # SQLite(开发测试用)SQLAlchemy 2.0 起,原生支持 asyncio,无需第三方包装器!
✅ 步骤 2:定义数据库模型(User 表)
# models.pyfrom sqlalchemy import Integer, Stringfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass Base(DeclarativeBase): passclass User(Base): __tablename__ = "users" id: Mapped[int] = mapped_column(Integer, primary_key=True) username: Mapped[str] = mapped_column(String(50), unique=True) email: Mapped[str] = mapped_column(String(100), unique=True)注意:SQLAlchemy 2.0 推荐使用 Mapped 类型注解,更清晰、更类型安全!
# database.pyfrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.orm import sessionmaker# 开发用 SQLite(文件数据库)DATABASE_URL = "sqlite+aiosqlite:///./test.db"# 生产推荐 PostgreSQL:# DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"engine = create_async_engine(DATABASE_URL, echo=True) # echo=True 打印 SQLAsyncSessionLocal = sessionmaker( bind=engine, class_=AsyncSession, expire_on_commit=False)# 初始化数据库表async def init_db(): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all)✅ 步骤 4:编写异步 CRUD 函数
# crud.pyfrom sqlalchemy.future import selectfrom .database import AsyncSessionLocalfrom .models import Userasync def create_user(username: str, email: str): async with AsyncSessionLocal() as session: async with session.begin(): # 自动提交或回滚 user = User(username=username, email=email) session.add(user) await session.flush() # 获取自增 ID(如果需要) await session.refresh(user) # 同步对象状态 return user✅ 关键点:
- async with session.begin() → 自动管理事务
- flush() → 立即写入 DB(获取 ID)
- refresh() → 从 DB 重新加载对象(含默认值、触发器结果等)
✅ 步骤 5:集成到 FastAPI 路由
# main.pyfrom fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelfrom .crud import create_userfrom .database import init_dbapp = FastAPI()@app.on_event("startup")async def startup(): await init_db() # 启动时创建表class UserCreate(BaseModel): username: str email: str@app.post("/users", response_model=dict)async def register(user: UserCreate): try: db_user = await create_user(user.username, user.email) return { "code": 201, "message": "User created", "data": {"id": db_user.id, "username": db_user.username} } except Exception as e: # 实际应捕获 IntegrityError 等具体异常 raise HTTPException(status_code=400, detail="Username or email already exists")测试效果
启动服务:
uvicorn main:app --reload、POST /users:
{ "username": "alice", "email": "alice@example.com" }✅ 返回:
{ "code": 201, "message": "User created", "data": { "id": 1, "username": "alice" }}同时,控制台打印出执行的 SQL(因 echo=True)!
安全与最佳实践
问题 | 解决方案 |
并发插入重复用户名 | 数据库加 UNIQUE 约束 + 捕获 IntegrityError |
长时间运行阻塞事件循环 | 所有 DB 操作必须 await,勿混用同步代码 |
连接泄漏 | 使用 async with 确保会话自动关闭 |
敏感信息泄露 | 不要直接返回 User 对象,用 Pydantic 模型过滤字段 |
SQLAlchemy 1.x vs 2.0 异步写法对比
操作 | 1.x(旧) | 2.0(新)
|
查询 | session.query(User).filter(...) | select(User).where(...) |
执行 | result = await session.execute(stmt) | 同左 |
获取结果 | user = result.scalars().first() | 同左 |
事务 | 手动 commit()/rollback() | async with session.begin() 自动管理 |
✅ SQLAlchemy 2.0 更接近原生 SQL 思维,且与 asyncio 深度集成!
文章版权声明:除非注明,否则均为边学边练网络文章,版权归原作者所有
