数据库建模软件(异步数据库操作!用 SQLAlchemy 2.0 + async 写高效 CRUD)

数据库建模软件(异步数据库操作!用 SQLAlchemy 2.0 + async 写高效 CRUD)
异步数据库操作!用 SQLAlchemy 2.0 + async 写高效 CRUD

如果你还在用同步 ORM(如 Flask-SQLAlchemy),你的 API 在高并发下会阻塞等待 I/O,吞吐量骤降。

今天,我们用 SQLAlchemy 2.0 + async/await,写出非阻塞、高并发的数据库操作!


今日目标

实现一个异步用户注册接口,将数据安全存入 PostgreSQL(或 SQLite)

要求:

  • 使用 SQLAlchemy 2.0 的新式写法
  • 全程 async/await
  • 自动处理连接池与事务

✅ 步骤 1:安装依赖

pip install sqlalchemy asyncpg  # PostgreSQL# 或pip install sqlalchemy aiosqlite  # SQLite(开发测试用)

SQLAlchemy 2.0 起,原生支持 asyncio,无需第三方包装器!


✅ 步骤 2:定义数据库模型(User 表)

# models.pyfrom sqlalchemy import Integer, Stringfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_columnclass Base(DeclarativeBase):    passclass User(Base):    __tablename__ = "users"    id: Mapped[int] = mapped_column(Integer, primary_key=True)    username: Mapped[str] = mapped_column(String(50), unique=True)    email: Mapped[str] = mapped_column(String(100), unique=True)

注意:SQLAlchemy 2.0 推荐使用 Mapped 类型注解,更清晰、更类型安全!

# database.pyfrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.orm import sessionmaker# 开发用 SQLite(文件数据库)DATABASE_URL = "sqlite+aiosqlite:///./test.db"# 生产推荐 PostgreSQL:# DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"engine = create_async_engine(DATABASE_URL, echo=True)  # echo=True 打印 SQLAsyncSessionLocal = sessionmaker(    bind=engine,    class_=AsyncSession,    expire_on_commit=False)# 初始化数据库表async def init_db():    async with engine.begin() as conn:        await conn.run_sync(Base.metadata.create_all)

✅ 步骤 4:编写异步 CRUD 函数

# crud.pyfrom sqlalchemy.future import selectfrom .database import AsyncSessionLocalfrom .models import Userasync def create_user(username: str, email: str):    async with AsyncSessionLocal() as session:        async with session.begin():  # 自动提交或回滚            user = User(username=username, email=email)            session.add(user)            await session.flush()  # 获取自增 ID(如果需要)            await session.refresh(user)  # 同步对象状态            return user

✅ 关键点:

  • async with session.begin() → 自动管理事务
  • flush() → 立即写入 DB(获取 ID)
  • refresh() → 从 DB 重新加载对象(含默认值、触发器结果等)

✅ 步骤 5:集成到 FastAPI 路由

# main.pyfrom fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelfrom .crud import create_userfrom .database import init_dbapp = FastAPI()@app.on_event("startup")async def startup():    await init_db()  # 启动时创建表class UserCreate(BaseModel):    username: str    email: str@app.post("/users", response_model=dict)async def register(user: UserCreate):    try:        db_user = await create_user(user.username, user.email)        return {            "code": 201,            "message": "User created",            "data": {"id": db_user.id, "username": db_user.username}        }    except Exception as e:        # 实际应捕获 IntegrityError 等具体异常        raise HTTPException(status_code=400, detail="Username or email already exists")

测试效果

启动服务:

uvicorn main:app --reload、

POST /users:

{ "username": "alice", "email": "alice@example.com" }

✅ 返回:

{  "code": 201,  "message": "User created",  "data": { "id": 1, "username": "alice" }}

同时,控制台打印出执行的 SQL(因 echo=True)!


安全与最佳实践

问题

解决方案

并发插入重复用户名

数据库加 UNIQUE 约束 + 捕获 IntegrityError

长时间运行阻塞事件循环

所有 DB 操作必须 await,勿混用同步代码

连接泄漏

使用 async with 确保会话自动关闭

敏感信息泄露

不要直接返回 User 对象,用 Pydantic 模型过滤字段

SQLAlchemy 1.x vs 2.0 异步写法对比

操作

1.x(旧)

2.0(新)

数据库建模软件(异步数据库操作!用 SQLAlchemy 2.0 + async 写高效 CRUD)

查询

session.query(User).filter(...)

select(User).where(...)

执行

result = await session.execute(stmt)

同左

获取结果

user = result.scalars().first()

同左

事务

手动 commit()/rollback()

async with session.begin() 自动管理

SQLAlchemy 2.0 更接近原生 SQL 思维,且与 asyncio 深度集成!


文章版权声明:除非注明,否则均为边学边练网络文章,版权归原作者所有

相关阅读