12.1 项目概述
12.1.1 项目介绍
本章将通过一个完整的博客系统项目,综合运用前面所学的Flask知识,包括: - 用户认证与授权 - 文章管理系统 - 评论系统 - 文件上传 - 搜索功能 - 缓存优化 - API接口 - 部署运维
12.1.2 技术栈
# 技术栈清单
backend:
framework: Flask 2.3+
database: PostgreSQL 14+
cache: Redis 6+
search: Elasticsearch 8+
task_queue: Celery 5+
frontend:
template_engine: Jinja2
css_framework: Bootstrap 5
javascript: Vanilla JS + HTMX
deployment:
containerization: Docker
orchestration: Docker Compose
web_server: Nginx
wsgi_server: Gunicorn
monitoring:
metrics: Prometheus
logging: ELK Stack
apm: Sentry
12.1.3 项目结构
blog_project/
├── app/
│ ├── __init__.py
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── article.py
│ │ ├── comment.py
│ │ └── category.py
│ ├── views/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── main.py
│ │ ├── article.py
│ │ ├── admin.py
│ │ └── api/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── articles.py
│ │ └── comments.py
│ ├── forms/
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── article.py
│ │ └── comment.py
│ ├── services/
│ │ ├── __init__.py
│ │ ├── auth_service.py
│ │ ├── article_service.py
│ │ ├── search_service.py
│ │ ├── cache_service.py
│ │ └── email_service.py
│ ├── utils/
│ │ ├── __init__.py
│ │ ├── decorators.py
│ │ ├── helpers.py
│ │ ├── validators.py
│ │ └── pagination.py
│ ├── static/
│ │ ├── css/
│ │ ├── js/
│ │ ├── images/
│ │ └── uploads/
│ └── templates/
│ ├── base.html
│ ├── auth/
│ ├── articles/
│ ├── admin/
│ └── components/
├── migrations/
├── tests/
├── config/
├── scripts/
├── docker/
├── requirements/
├── .env.example
├── docker-compose.yml
├── Dockerfile
└── run.py
12.2 核心模型设计
12.2.1 用户模型
# app/models/user.py
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
from flask_login import UserMixin
from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import URLSafeTimedSerializer
from app import db
class User(UserMixin, db.Model):
"""用户模型"""
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
# 个人信息
first_name = db.Column(db.String(50))
last_name = db.Column(db.String(50))
bio = db.Column(db.Text)
avatar = db.Column(db.String(255))
website = db.Column(db.String(255))
location = db.Column(db.String(100))
# 状态字段
is_active = db.Column(db.Boolean, default=True)
is_verified = db.Column(db.Boolean, default=False)
is_admin = db.Column(db.Boolean, default=False)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
last_login = db.Column(db.DateTime)
# 关系
articles = db.relationship('Article', backref='author', lazy='dynamic', cascade='all, delete-orphan')
comments = db.relationship('Comment', backref='author', lazy='dynamic', cascade='all, delete-orphan')
# 关注关系
followed = db.relationship(
'User', secondary='user_follows',
primaryjoin='User.id == user_follows.c.follower_id',
secondaryjoin='User.id == user_follows.c.followed_id',
backref=db.backref('followers', lazy='dynamic'),
lazy='dynamic'
)
def __repr__(self):
return f'<User {self.username}>'
@property
def password(self):
raise AttributeError('密码不可读取')
@password.setter
def password(self, password):
self.password_hash = generate_password_hash(password)
def verify_password(self, password):
"""验证密码"""
return check_password_hash(self.password_hash, password)
def generate_token(self, expiration=3600):
"""生成令牌"""
from app import current_app
s = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
return s.dumps({'user_id': self.id})
@staticmethod
def verify_token(token, expiration=3600):
"""验证令牌"""
from app import current_app
s = URLSafeTimedSerializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token, max_age=expiration)
return User.query.get(data['user_id'])
except:
return None
def follow(self, user):
"""关注用户"""
if not self.is_following(user):
self.followed.append(user)
def unfollow(self, user):
"""取消关注"""
if self.is_following(user):
self.followed.remove(user)
def is_following(self, user):
"""检查是否关注"""
return self.followed.filter(
user_follows.c.followed_id == user.id
).count() > 0
@property
def full_name(self):
"""完整姓名"""
if self.first_name and self.last_name:
return f'{self.first_name} {self.last_name}'
return self.username
@property
def article_count(self):
"""文章数量"""
return self.articles.filter_by(is_published=True).count()
@property
def follower_count(self):
"""粉丝数量"""
return self.followers.count()
@property
def following_count(self):
"""关注数量"""
return self.followed.count()
def to_dict(self, include_email=False):
"""转换为字典"""
data = {
'id': self.id,
'username': self.username,
'full_name': self.full_name,
'bio': self.bio,
'avatar': self.avatar,
'website': self.website,
'location': self.location,
'article_count': self.article_count,
'follower_count': self.follower_count,
'following_count': self.following_count,
'created_at': self.created_at.isoformat() if self.created_at else None,
'last_login': self.last_login.isoformat() if self.last_login else None
}
if include_email:
data['email'] = self.email
return data
# 关注关系表
user_follows = db.Table(
'user_follows',
db.Column('follower_id', db.Integer, db.ForeignKey('users.id'), primary_key=True),
db.Column('followed_id', db.Integer, db.ForeignKey('users.id'), primary_key=True)
)
12.2.2 文章模型
# app/models/article.py
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.ext.hybrid import hybrid_property
from app import db
import re
class Article(db.Model):
"""文章模型"""
__tablename__ = 'articles'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False, index=True)
slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
content = db.Column(db.Text, nullable=False)
summary = db.Column(db.Text)
# 状态字段
is_published = db.Column(db.Boolean, default=False, index=True)
is_featured = db.Column(db.Boolean, default=False)
allow_comments = db.Column(db.Boolean, default=True)
# 统计字段
view_count = db.Column(db.Integer, default=0)
like_count = db.Column(db.Integer, default=0)
comment_count = db.Column(db.Integer, default=0)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
published_at = db.Column(db.DateTime, index=True)
# 外键
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
category_id = db.Column(db.Integer, db.ForeignKey('categories.id'), index=True)
# 关系
comments = db.relationship('Comment', backref='article', lazy='dynamic', cascade='all, delete-orphan')
tags = db.relationship('Tag', secondary='article_tags', backref=db.backref('articles', lazy='dynamic'))
def __repr__(self):
return f'<Article {self.title}>'
@hybrid_property
def reading_time(self):
"""阅读时间(分钟)"""
if not self.content:
return 0
# 计算字数(简单估算)
word_count = len(re.findall(r'\w+', self.content))
# 假设每分钟阅读200字
return max(1, word_count // 200)
def generate_slug(self):
"""生成URL slug"""
import re
from unidecode import unidecode
# 转换为ASCII
slug = unidecode(self.title.lower())
# 替换非字母数字字符为连字符
slug = re.sub(r'[^a-z0-9]+', '-', slug)
# 去除首尾连字符
slug = slug.strip('-')
# 确保唯一性
original_slug = slug
counter = 1
while Article.query.filter_by(slug=slug).first():
slug = f'{original_slug}-{counter}'
counter += 1
return slug
def publish(self):
"""发布文章"""
self.is_published = True
self.published_at = datetime.utcnow()
if not self.slug:
self.slug = self.generate_slug()
def unpublish(self):
"""取消发布"""
self.is_published = False
self.published_at = None
def increment_view_count(self):
"""增加浏览次数"""
self.view_count += 1
db.session.commit()
def get_summary(self, length=200):
"""获取摘要"""
if self.summary:
return self.summary
# 从内容中提取摘要
import re
# 移除HTML标签
text = re.sub(r'<[^>]+>', '', self.content)
# 截取指定长度
if len(text) <= length:
return text
return text[:length].rsplit(' ', 1)[0] + '...'
@property
def published_comments(self):
"""已发布的评论"""
return self.comments.filter_by(is_approved=True)
def to_dict(self, include_content=False):
"""转换为字典"""
data = {
'id': self.id,
'title': self.title,
'slug': self.slug,
'summary': self.get_summary(),
'is_published': self.is_published,
'is_featured': self.is_featured,
'view_count': self.view_count,
'like_count': self.like_count,
'comment_count': self.comment_count,
'reading_time': self.reading_time,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'published_at': self.published_at.isoformat() if self.published_at else None,
'author': self.author.to_dict() if self.author else None,
'category': self.category.to_dict() if self.category else None,
'tags': [tag.to_dict() for tag in self.tags]
}
if include_content:
data['content'] = self.content
return data
class Category(db.Model):
"""分类模型"""
__tablename__ = 'categories'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), unique=True, nullable=False, index=True)
slug = db.Column(db.String(100), unique=True, nullable=False, index=True)
description = db.Column(db.Text)
color = db.Column(db.String(7), default='#007bff') # 十六进制颜色
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 关系
articles = db.relationship('Article', backref='category', lazy='dynamic')
def __repr__(self):
return f'<Category {self.name}>'
@property
def article_count(self):
"""文章数量"""
return self.articles.filter_by(is_published=True).count()
def to_dict(self):
"""转换为字典"""
return {
'id': self.id,
'name': self.name,
'slug': self.slug,
'description': self.description,
'color': self.color,
'article_count': self.article_count,
'created_at': self.created_at.isoformat() if self.created_at else None
}
class Tag(db.Model):
"""标签模型"""
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False, index=True)
slug = db.Column(db.String(50), unique=True, nullable=False, index=True)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f'<Tag {self.name}>'
@property
def article_count(self):
"""文章数量"""
return len([a for a in self.articles if a.is_published])
def to_dict(self):
"""转换为字典"""
return {
'id': self.id,
'name': self.name,
'slug': self.slug,
'article_count': self.article_count,
'created_at': self.created_at.isoformat() if self.created_at else None
}
# 文章标签关联表
article_tags = db.Table(
'article_tags',
db.Column('article_id', db.Integer, db.ForeignKey('articles.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)
12.2.3 评论模型
# app/models/comment.py
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
from app import db
class Comment(db.Model):
"""评论模型"""
__tablename__ = 'comments'
id = db.Column(db.Integer, primary_key=True)
content = db.Column(db.Text, nullable=False)
# 状态字段
is_approved = db.Column(db.Boolean, default=False, index=True)
is_spam = db.Column(db.Boolean, default=False)
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 外键
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
article_id = db.Column(db.Integer, db.ForeignKey('articles.id'), nullable=False, index=True)
parent_id = db.Column(db.Integer, db.ForeignKey('comments.id'), index=True)
# 关系
replies = db.relationship(
'Comment',
backref=db.backref('parent', remote_side=[id]),
lazy='dynamic',
cascade='all, delete-orphan'
)
def __repr__(self):
return f'<Comment {self.id} by {self.author.username}>'
def approve(self):
"""批准评论"""
self.is_approved = True
# 更新文章评论数
if self.article:
self.article.comment_count = self.article.comments.filter_by(is_approved=True).count()
def mark_as_spam(self):
"""标记为垃圾评论"""
self.is_spam = True
self.is_approved = False
@property
def is_reply(self):
"""是否为回复"""
return self.parent_id is not None
@property
def approved_replies(self):
"""已批准的回复"""
return self.replies.filter_by(is_approved=True)
def to_dict(self, include_replies=False):
"""转换为字典"""
data = {
'id': self.id,
'content': self.content,
'is_approved': self.is_approved,
'is_reply': self.is_reply,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'author': self.author.to_dict() if self.author else None,
'article_id': self.article_id,
'parent_id': self.parent_id
}
if include_replies:
data['replies'] = [reply.to_dict() for reply in self.approved_replies]
return data
12.3 服务层设计
12.3.1 认证服务
# app/services/auth_service.py
from datetime import datetime, timedelta
from flask import current_app
from flask_login import login_user, logout_user
from app import db
from app.models.user import User
from app.services.email_service import EmailService
import secrets
import hashlib
class AuthService:
"""认证服务"""
def __init__(self):
self.email_service = EmailService()
def register_user(self, username, email, password, **kwargs):
"""注册用户"""
# 检查用户名和邮箱是否已存在
if User.query.filter_by(username=username).first():
raise ValueError('用户名已存在')
if User.query.filter_by(email=email).first():
raise ValueError('邮箱已被注册')
# 创建用户
user = User(
username=username,
email=email,
password=password,
**kwargs
)
db.session.add(user)
db.session.commit()
# 发送验证邮件
self.send_verification_email(user)
return user
def authenticate_user(self, username_or_email, password, remember_me=False):
"""用户认证"""
# 查找用户(支持用户名或邮箱登录)
user = User.query.filter(
(User.username == username_or_email) |
(User.email == username_or_email)
).first()
if not user or not user.verify_password(password):
raise ValueError('用户名/邮箱或密码错误')
if not user.is_active:
raise ValueError('账户已被禁用')
# 更新最后登录时间
user.last_login = datetime.utcnow()
db.session.commit()
# 登录用户
login_user(user, remember=remember_me)
return user
def logout_user(self):
"""用户登出"""
logout_user()
def send_verification_email(self, user):
"""发送验证邮件"""
token = user.generate_token(expiration=86400) # 24小时有效
self.email_service.send_email(
to=user.email,
subject='验证您的邮箱地址',
template='auth/verify_email.html',
user=user,
token=token
)
def verify_email(self, token):
"""验证邮箱"""
user = User.verify_token(token, expiration=86400)
if not user:
raise ValueError('验证链接无效或已过期')
user.is_verified = True
db.session.commit()
return user
def send_password_reset_email(self, email):
"""发送密码重置邮件"""
user = User.query.filter_by(email=email).first()
if not user:
raise ValueError('邮箱地址不存在')
token = user.generate_token(expiration=3600) # 1小时有效
self.email_service.send_email(
to=user.email,
subject='重置您的密码',
template='auth/reset_password.html',
user=user,
token=token
)
def reset_password(self, token, new_password):
"""重置密码"""
user = User.verify_token(token, expiration=3600)
if not user:
raise ValueError('重置链接无效或已过期')
user.password = new_password
db.session.commit()
return user
def change_password(self, user, old_password, new_password):
"""修改密码"""
if not user.verify_password(old_password):
raise ValueError('原密码错误')
user.password = new_password
db.session.commit()
return user
def update_profile(self, user, **kwargs):
"""更新用户资料"""
allowed_fields = [
'first_name', 'last_name', 'bio',
'website', 'location', 'avatar'
]
for field, value in kwargs.items():
if field in allowed_fields:
setattr(user, field, value)
user.updated_at = datetime.utcnow()
db.session.commit()
return user
def deactivate_user(self, user):
"""停用用户"""
user.is_active = False
db.session.commit()
def activate_user(self, user):
"""激活用户"""
user.is_active = True
db.session.commit()
12.3.2 文章服务
# app/services/article_service.py
from datetime import datetime
from flask import current_app
from sqlalchemy import or_, and_
from app import db
from app.models.article import Article, Category, Tag
from app.services.search_service import SearchService
from app.services.cache_service import CacheService
class ArticleService:
"""文章服务"""
def __init__(self):
self.search_service = SearchService()
self.cache_service = CacheService()
def create_article(self, author, title, content, **kwargs):
"""创建文章"""
article = Article(
title=title,
content=content,
author=author,
**kwargs
)
# 生成slug
if not article.slug:
article.slug = article.generate_slug()
# 处理分类
if 'category_name' in kwargs:
category = self.get_or_create_category(kwargs['category_name'])
article.category = category
# 处理标签
if 'tag_names' in kwargs:
tags = self.get_or_create_tags(kwargs['tag_names'])
article.tags = tags
db.session.add(article)
db.session.commit()
# 如果文章已发布,添加到搜索索引
if article.is_published:
self.search_service.index_article(article)
return article
def update_article(self, article, **kwargs):
"""更新文章"""
# 更新基本字段
allowed_fields = [
'title', 'content', 'summary', 'is_featured',
'allow_comments'
]
for field, value in kwargs.items():
if field in allowed_fields:
setattr(article, field, value)
# 更新分类
if 'category_name' in kwargs:
category = self.get_or_create_category(kwargs['category_name'])
article.category = category
# 更新标签
if 'tag_names' in kwargs:
tags = self.get_or_create_tags(kwargs['tag_names'])
article.tags = tags
article.updated_at = datetime.utcnow()
db.session.commit()
# 更新搜索索引
if article.is_published:
self.search_service.update_article(article)
# 清除缓存
self.cache_service.delete(f'article:{article.id}')
return article
def publish_article(self, article):
"""发布文章"""
article.publish()
db.session.commit()
# 添加到搜索索引
self.search_service.index_article(article)
return article
def unpublish_article(self, article):
"""取消发布文章"""
article.unpublish()
db.session.commit()
# 从搜索索引中移除
self.search_service.remove_article(article)
return article
def delete_article(self, article):
"""删除文章"""
# 从搜索索引中移除
self.search_service.remove_article(article)
# 清除缓存
self.cache_service.delete(f'article:{article.id}')
db.session.delete(article)
db.session.commit()
def get_article_by_slug(self, slug, increment_view=False):
"""根据slug获取文章"""
cache_key = f'article:slug:{slug}'
article = self.cache_service.get(cache_key)
if not article:
article = Article.query.filter_by(slug=slug, is_published=True).first()
if article:
self.cache_service.set(cache_key, article, timeout=3600)
if article and increment_view:
article.increment_view_count()
return article
def get_articles(self, page=1, per_page=10, **filters):
"""获取文章列表"""
query = Article.query
# 应用过滤器
if filters.get('published_only', True):
query = query.filter_by(is_published=True)
if filters.get('author_id'):
query = query.filter_by(author_id=filters['author_id'])
if filters.get('category_id'):
query = query.filter_by(category_id=filters['category_id'])
if filters.get('tag_id'):
query = query.filter(Article.tags.any(id=filters['tag_id']))
if filters.get('featured_only'):
query = query.filter_by(is_featured=True)
if filters.get('search'):
search_term = f"%{filters['search']}%"
query = query.filter(
or_(
Article.title.ilike(search_term),
Article.content.ilike(search_term)
)
)
# 排序
order_by = filters.get('order_by', 'created_at')
if order_by == 'popular':
query = query.order_by(Article.view_count.desc())
elif order_by == 'likes':
query = query.order_by(Article.like_count.desc())
else:
query = query.order_by(Article.created_at.desc())
return query.paginate(
page=page,
per_page=per_page,
error_out=False
)
def get_popular_articles(self, limit=5, days=30):
"""获取热门文章"""
cache_key = f'popular_articles:{limit}:{days}'
articles = self.cache_service.get(cache_key)
if not articles:
from datetime import timedelta
since_date = datetime.utcnow() - timedelta(days=days)
articles = Article.query.filter(
and_(
Article.is_published == True,
Article.created_at >= since_date
)
).order_by(
Article.view_count.desc()
).limit(limit).all()
self.cache_service.set(cache_key, articles, timeout=3600)
return articles
def get_related_articles(self, article, limit=5):
"""获取相关文章"""
cache_key = f'related_articles:{article.id}:{limit}'
related = self.cache_service.get(cache_key)
if not related:
# 基于标签和分类查找相关文章
query = Article.query.filter(
and_(
Article.id != article.id,
Article.is_published == True
)
)
# 优先显示同分类的文章
if article.category:
query = query.filter_by(category_id=article.category.id)
# 如果同分类文章不够,再查找有共同标签的文章
related = query.limit(limit).all()
if len(related) < limit and article.tags:
tag_ids = [tag.id for tag in article.tags]
additional = Article.query.filter(
and_(
Article.id != article.id,
Article.is_published == True,
Article.tags.any(Tag.id.in_(tag_ids))
)
).limit(limit - len(related)).all()
related.extend(additional)
self.cache_service.set(cache_key, related, timeout=1800)
return related
def get_or_create_category(self, name):
"""获取或创建分类"""
category = Category.query.filter_by(name=name).first()
if not category:
from unidecode import unidecode
import re
slug = unidecode(name.lower())
slug = re.sub(r'[^a-z0-9]+', '-', slug).strip('-')
category = Category(name=name, slug=slug)
db.session.add(category)
db.session.commit()
return category
def get_or_create_tags(self, tag_names):
"""获取或创建标签"""
tags = []
for name in tag_names:
tag = Tag.query.filter_by(name=name).first()
if not tag:
from unidecode import unidecode
import re
slug = unidecode(name.lower())
slug = re.sub(r'[^a-z0-9]+', '-', slug).strip('-')
tag = Tag(name=name, slug=slug)
db.session.add(tag)
tags.append(tag)
db.session.commit()
return tags
def search_articles(self, query, page=1, per_page=10):
"""搜索文章"""
return self.search_service.search_articles(query, page, per_page)
12.4 视图层设计
12.4.1 主页视图
# app/views/main.py
from flask import Blueprint, render_template, request, jsonify, current_app
from flask_login import current_user
from app.services.article_service import ArticleService
from app.services.cache_service import CacheService
from app.models.article import Article, Category, Tag
from app.utils.pagination import Pagination
main_bp = Blueprint('main', __name__)
article_service = ArticleService()
cache_service = CacheService()
@main_bp.route('/')
def index():
"""首页"""
page = request.args.get('page', 1, type=int)
per_page = current_app.config.get('ARTICLES_PER_PAGE', 10)
# 获取文章列表
pagination = article_service.get_articles(
page=page,
per_page=per_page,
published_only=True
)
# 获取热门文章
popular_articles = article_service.get_popular_articles(limit=5)
# 获取分类和标签
categories = Category.query.all()
tags = Tag.query.limit(20).all()
return render_template(
'main/index.html',
pagination=pagination,
popular_articles=popular_articles,
categories=categories,
tags=tags
)
@main_bp.route('/article/<slug>')
def article_detail(slug):
"""文章详情"""
article = article_service.get_article_by_slug(slug, increment_view=True)
if not article:
abort(404)
# 获取相关文章
related_articles = article_service.get_related_articles(article, limit=5)
# 获取评论
comments = article.published_comments.order_by('created_at').all()
return render_template(
'articles/detail.html',
article=article,
related_articles=related_articles,
comments=comments
)
@main_bp.route('/category/<slug>')
def category_articles(slug):
"""分类文章列表"""
category = Category.query.filter_by(slug=slug).first_or_404()
page = request.args.get('page', 1, type=int)
per_page = current_app.config.get('ARTICLES_PER_PAGE', 10)
pagination = article_service.get_articles(
page=page,
per_page=per_page,
category_id=category.id
)
return render_template(
'articles/category.html',
category=category,
pagination=pagination
)
@main_bp.route('/tag/<slug>')
def tag_articles(slug):
"""标签文章列表"""
tag = Tag.query.filter_by(slug=slug).first_or_404()
page = request.args.get('page', 1, type=int)
per_page = current_app.config.get('ARTICLES_PER_PAGE', 10)
pagination = article_service.get_articles(
page=page,
per_page=per_page,
tag_id=tag.id
)
return render_template(
'articles/tag.html',
tag=tag,
pagination=pagination
)
@main_bp.route('/search')
def search():
"""搜索"""
query = request.args.get('q', '').strip()
page = request.args.get('page', 1, type=int)
per_page = current_app.config.get('ARTICLES_PER_PAGE', 10)
if not query:
return render_template('main/search.html', query=query)
# 使用搜索服务
results = article_service.search_articles(query, page, per_page)
return render_template(
'main/search.html',
query=query,
results=results
)
@main_bp.route('/about')
def about():
"""关于页面"""
return render_template('main/about.html')
@main_bp.route('/contact')
def contact():
"""联系页面"""
return render_template('main/contact.html')
12.4.2 文章管理视图
# app/views/article.py
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify, abort
from flask_login import login_required, current_user
from app.services.article_service import ArticleService
from app.forms.article import ArticleForm, CommentForm
from app.models.article import Article
from app.models.comment import Comment
from app import db
article_bp = Blueprint('article', __name__, url_prefix='/articles')
article_service = ArticleService()
@article_bp.route('/create', methods=['GET', 'POST'])
@login_required
def create():
"""创建文章"""
form = ArticleForm()
if form.validate_on_submit():
try:
article = article_service.create_article(
author=current_user,
title=form.title.data,
content=form.content.data,
summary=form.summary.data,
category_name=form.category.data,
tag_names=form.tags.data.split(',') if form.tags.data else [],
is_featured=form.is_featured.data,
allow_comments=form.allow_comments.data
)
# 如果选择发布
if form.publish.data:
article_service.publish_article(article)
flash('文章已发布!', 'success')
else:
flash('文章已保存为草稿!', 'info')
return redirect(url_for('article.detail', slug=article.slug))
except Exception as e:
flash(f'创建文章失败:{str(e)}', 'error')
return render_template('articles/create.html', form=form)
@article_bp.route('/<slug>/edit', methods=['GET', 'POST'])
@login_required
def edit(slug):
"""编辑文章"""
article = Article.query.filter_by(slug=slug).first_or_404()
# 检查权限
if article.author != current_user and not current_user.is_admin:
abort(403)
form = ArticleForm(obj=article)
if form.validate_on_submit():
try:
article_service.update_article(
article,
title=form.title.data,
content=form.content.data,
summary=form.summary.data,
category_name=form.category.data,
tag_names=form.tags.data.split(',') if form.tags.data else [],
is_featured=form.is_featured.data,
allow_comments=form.allow_comments.data
)
# 处理发布状态
if form.publish.data and not article.is_published:
article_service.publish_article(article)
flash('文章已发布!', 'success')
elif not form.publish.data and article.is_published:
article_service.unpublish_article(article)
flash('文章已取消发布!', 'info')
else:
flash('文章已更新!', 'success')
return redirect(url_for('article.detail', slug=article.slug))
except Exception as e:
flash(f'更新文章失败:{str(e)}', 'error')
return render_template('articles/edit.html', form=form, article=article)
@article_bp.route('/<slug>/delete', methods=['POST'])
@login_required
def delete(slug):
"""删除文章"""
article = Article.query.filter_by(slug=slug).first_or_404()
# 检查权限
if article.author != current_user and not current_user.is_admin:
abort(403)
try:
article_service.delete_article(article)
flash('文章已删除!', 'success')
except Exception as e:
flash(f'删除文章失败:{str(e)}', 'error')
return redirect(url_for('main.index'))
@article_bp.route('/<slug>/comment', methods=['POST'])
@login_required
def add_comment(slug):
"""添加评论"""
article = Article.query.filter_by(slug=slug, is_published=True).first_or_404()
if not article.allow_comments:
flash('该文章不允许评论', 'warning')
return redirect(url_for('main.article_detail', slug=slug))
form = CommentForm()
if form.validate_on_submit():
comment = Comment(
content=form.content.data,
author=current_user,
article=article,
parent_id=form.parent_id.data if form.parent_id.data else None
)
# 自动批准已验证用户的评论
if current_user.is_verified:
comment.approve()
db.session.add(comment)
db.session.commit()
flash('评论已提交!', 'success')
return redirect(url_for('main.article_detail', slug=slug))
@article_bp.route('/my-articles')
@login_required
def my_articles():
"""我的文章"""
page = request.args.get('page', 1, type=int)
per_page = current_app.config.get('ARTICLES_PER_PAGE', 10)
pagination = article_service.get_articles(
page=page,
per_page=per_page,
author_id=current_user.id,
published_only=False
)
return render_template('articles/my_articles.html', pagination=pagination)
12.4.3 用户认证视图
# app/views/auth.py
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app
from flask_login import login_user, logout_user, login_required, current_user
from app.services.auth_service import AuthService
from app.forms.auth import LoginForm, RegisterForm, ResetPasswordForm, ChangePasswordForm
from app.models.user import User
auth_bp = Blueprint('auth', __name__, url_prefix='/auth')
auth_service = AuthService()
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
"""用户登录"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = LoginForm()
if form.validate_on_submit():
try:
user = auth_service.authenticate_user(
username_or_email=form.username_or_email.data,
password=form.password.data,
remember_me=form.remember_me.data
)
flash(f'欢迎回来,{user.username}!', 'success')
# 重定向到原来要访问的页面
next_page = request.args.get('next')
if next_page:
return redirect(next_page)
return redirect(url_for('main.index'))
except ValueError as e:
flash(str(e), 'error')
return render_template('auth/login.html', form=form)
@auth_bp.route('/register', methods=['GET', 'POST'])
def register():
"""用户注册"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = RegisterForm()
if form.validate_on_submit():
try:
user = auth_service.register_user(
username=form.username.data,
email=form.email.data,
password=form.password.data,
first_name=form.first_name.data,
last_name=form.last_name.data
)
flash('注册成功!请检查您的邮箱以验证账户。', 'success')
return redirect(url_for('auth.login'))
except ValueError as e:
flash(str(e), 'error')
return render_template('auth/register.html', form=form)
@auth_bp.route('/logout')
@login_required
def logout():
"""用户登出"""
auth_service.logout_user()
flash('您已成功登出。', 'info')
return redirect(url_for('main.index'))
@auth_bp.route('/verify-email/<token>')
def verify_email(token):
"""验证邮箱"""
try:
user = auth_service.verify_email(token)
flash('邮箱验证成功!', 'success')
login_user(user)
return redirect(url_for('main.index'))
except ValueError as e:
flash(str(e), 'error')
return redirect(url_for('auth.login'))
@auth_bp.route('/reset-password', methods=['GET', 'POST'])
def reset_password_request():
"""请求重置密码"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = ResetPasswordForm()
if form.validate_on_submit():
try:
auth_service.send_password_reset_email(form.email.data)
flash('密码重置邮件已发送,请检查您的邮箱。', 'info')
return redirect(url_for('auth.login'))
except ValueError as e:
flash(str(e), 'error')
return render_template('auth/reset_password_request.html', form=form)
@auth_bp.route('/reset-password/<token>', methods=['GET', 'POST'])
def reset_password(token):
"""重置密码"""
if current_user.is_authenticated:
return redirect(url_for('main.index'))
form = ChangePasswordForm()
if form.validate_on_submit():
try:
user = auth_service.reset_password(token, form.password.data)
flash('密码重置成功!', 'success')
return redirect(url_for('auth.login'))
except ValueError as e:
flash(str(e), 'error')
return render_template('auth/reset_password.html', form=form)
@auth_bp.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
"""用户资料"""
form = ProfileForm(obj=current_user)
if form.validate_on_submit():
try:
auth_service.update_profile(
current_user,
first_name=form.first_name.data,
last_name=form.last_name.data,
bio=form.bio.data,
website=form.website.data,
location=form.location.data
)
flash('资料更新成功!', 'success')
except Exception as e:
flash(f'更新失败:{str(e)}', 'error')
return render_template('auth/profile.html', form=form)
@auth_bp.route('/change-password', methods=['GET', 'POST'])
@login_required
def change_password():
"""修改密码"""
form = ChangePasswordForm()
if form.validate_on_submit():
try:
auth_service.change_password(
current_user,
old_password=form.old_password.data,
new_password=form.password.data
)
flash('密码修改成功!', 'success')
return redirect(url_for('auth.profile'))
except ValueError as e:
flash(str(e), 'error')
return render_template('auth/change_password.html', form=form)
12.5 表单设计
12.5.1 文章表单
# app/forms/article.py
from flask_wtf import FlaskForm
from wtforms import StringField, TextAreaField, BooleanField, SubmitField, HiddenField
from wtforms.validators import DataRequired, Length, Optional
from wtforms.widgets import TextArea
from flask_wtf.file import FileField, FileAllowed
class CKEditorWidget(TextArea):
"""CKEditor富文本编辑器组件"""
def __call__(self, field, **kwargs):
kwargs.setdefault('class_', 'ckeditor')
return super(CKEditorWidget, self).__call__(field, **kwargs)
class ArticleForm(FlaskForm):
"""文章表单"""
title = StringField(
'标题',
validators=[
DataRequired(message='标题不能为空'),
Length(min=1, max=200, message='标题长度必须在1-200字符之间')
],
render_kw={'placeholder': '请输入文章标题'}
)
summary = TextAreaField(
'摘要',
validators=[
Optional(),
Length(max=500, message='摘要不能超过500字符')
],
render_kw={
'placeholder': '请输入文章摘要(可选)',
'rows': 3
}
)
content = TextAreaField(
'内容',
validators=[
DataRequired(message='内容不能为空')
],
widget=CKEditorWidget(),
render_kw={'placeholder': '请输入文章内容'}
)
category = StringField(
'分类',
validators=[
Optional(),
Length(max=100, message='分类名称不能超过100字符')
],
render_kw={'placeholder': '请输入分类名称'}
)
tags = StringField(
'标签',
validators=[Optional()],
render_kw={
'placeholder': '请输入标签,用逗号分隔',
'data-role': 'tagsinput'
}
)
featured_image = FileField(
'特色图片',
validators=[
Optional(),
FileAllowed(['jpg', 'jpeg', 'png', 'gif'], '只支持图片格式')
]
)
is_featured = BooleanField('设为推荐文章')
allow_comments = BooleanField('允许评论', default=True)
# 操作按钮
save_draft = SubmitField('保存草稿')
publish = SubmitField('发布文章')
update = SubmitField('更新文章')
def validate(self):
"""自定义验证"""
if not super().validate():
return False
# 验证标签格式
if self.tags.data:
tags = [tag.strip() for tag in self.tags.data.split(',')]
if len(tags) > 10:
self.tags.errors.append('标签数量不能超过10个')
return False
for tag in tags:
if len(tag) > 50:
self.tags.errors.append('单个标签长度不能超过50字符')
return False
return True
class CommentForm(FlaskForm):
"""评论表单"""
content = TextAreaField(
'评论内容',
validators=[
DataRequired(message='评论内容不能为空'),
Length(min=1, max=1000, message='评论长度必须在1-1000字符之间')
],
render_kw={
'placeholder': '请输入您的评论...',
'rows': 4
}
)
parent_id = HiddenField('父评论ID')
submit = SubmitField('发表评论')
class SearchForm(FlaskForm):
"""搜索表单"""
query = StringField(
'搜索',
validators=[
DataRequired(message='搜索关键词不能为空'),
Length(min=1, max=100, message='搜索关键词长度必须在1-100字符之间')
],
render_kw={'placeholder': '搜索文章...'}
)
submit = SubmitField('搜索')
12.5.2 认证表单
# app/forms/auth.py
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField, TextAreaField
from wtforms.validators import DataRequired, Length, Email, EqualTo, Optional, URL
from wtforms.widgets import TextArea
from app.models.user import User
class LoginForm(FlaskForm):
"""登录表单"""
username_or_email = StringField(
'用户名或邮箱',
validators=[
DataRequired(message='用户名或邮箱不能为空'),
Length(min=1, max=120, message='长度必须在1-120字符之间')
],
render_kw={'placeholder': '请输入用户名或邮箱'}
)
password = PasswordField(
'密码',
validators=[
DataRequired(message='密码不能为空')
],
render_kw={'placeholder': '请输入密码'}
)
remember_me = BooleanField('记住我')
submit = SubmitField('登录')
class RegisterForm(FlaskForm):
"""注册表单"""
username = StringField(
'用户名',
validators=[
DataRequired(message='用户名不能为空'),
Length(min=3, max=80, message='用户名长度必须在3-80字符之间')
],
render_kw={'placeholder': '请输入用户名'}
)
email = StringField(
'邮箱',
validators=[
DataRequired(message='邮箱不能为空'),
Email(message='请输入有效的邮箱地址'),
Length(max=120, message='邮箱长度不能超过120字符')
],
render_kw={'placeholder': '请输入邮箱地址'}
)
first_name = StringField(
'名',
validators=[
Optional(),
Length(max=50, message='名字长度不能超过50字符')
],
render_kw={'placeholder': '请输入名字(可选)'}
)
last_name = StringField(
'姓',
validators=[
Optional(),
Length(max=50, message='姓氏长度不能超过50字符')
],
render_kw={'placeholder': '请输入姓氏(可选)'}
)
password = PasswordField(
'密码',
validators=[
DataRequired(message='密码不能为空'),
Length(min=6, max=128, message='密码长度必须在6-128字符之间')
],
render_kw={'placeholder': '请输入密码'}
)
password_confirm = PasswordField(
'确认密码',
validators=[
DataRequired(message='请确认密码'),
EqualTo('password', message='两次输入的密码不一致')
],
render_kw={'placeholder': '请再次输入密码'}
)
submit = SubmitField('注册')
def validate_username(self, field):
"""验证用户名唯一性"""
if User.query.filter_by(username=field.data).first():
raise ValidationError('用户名已存在')
def validate_email(self, field):
"""验证邮箱唯一性"""
if User.query.filter_by(email=field.data).first():
raise ValidationError('邮箱已被注册')
class ResetPasswordForm(FlaskForm):
"""重置密码请求表单"""
email = StringField(
'邮箱',
validators=[
DataRequired(message='邮箱不能为空'),
Email(message='请输入有效的邮箱地址')
],
render_kw={'placeholder': '请输入注册时使用的邮箱'}
)
submit = SubmitField('发送重置邮件')
class ChangePasswordForm(FlaskForm):
"""修改密码表单"""
old_password = PasswordField(
'当前密码',
validators=[
DataRequired(message='当前密码不能为空')
],
render_kw={'placeholder': '请输入当前密码'}
)
password = PasswordField(
'新密码',
validators=[
DataRequired(message='新密码不能为空'),
Length(min=6, max=128, message='密码长度必须在6-128字符之间')
],
render_kw={'placeholder': '请输入新密码'}
)
password_confirm = PasswordField(
'确认新密码',
validators=[
DataRequired(message='请确认新密码'),
EqualTo('password', message='两次输入的密码不一致')
],
render_kw={'placeholder': '请再次输入新密码'}
)
submit = SubmitField('修改密码')
class ProfileForm(FlaskForm):
"""用户资料表单"""
first_name = StringField(
'名',
validators=[
Optional(),
Length(max=50, message='名字长度不能超过50字符')
],
render_kw={'placeholder': '请输入名字'}
)
last_name = StringField(
'姓',
validators=[
Optional(),
Length(max=50, message='姓氏长度不能超过50字符')
],
render_kw={'placeholder': '请输入姓氏'}
)
bio = TextAreaField(
'个人简介',
validators=[
Optional(),
Length(max=500, message='个人简介不能超过500字符')
],
render_kw={
'placeholder': '请输入个人简介',
'rows': 4
}
)
website = StringField(
'个人网站',
validators=[
Optional(),
URL(message='请输入有效的网址'),
Length(max=255, message='网址长度不能超过255字符')
],
render_kw={'placeholder': 'https://example.com'}
)
location = StringField(
'所在地',
validators=[
Optional(),
Length(max=100, message='所在地长度不能超过100字符')
],
render_kw={'placeholder': '请输入所在地'}
)
submit = SubmitField('更新资料')
12.6 前端模板设计
12.6.1 基础模板
<!-- app/templates/base.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}Flask博客系统{% endblock %}</title>
<!-- Bootstrap CSS -->
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" rel="stylesheet">
<!-- Font Awesome -->
<link href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.0.0/css/all.min.css" rel="stylesheet">
<!-- CKEditor -->
<script src="https://cdn.ckeditor.com/ckeditor5/35.0.1/classic/ckeditor.js"></script>
<!-- 自定义样式 -->
<link href="{{ url_for('static', filename='css/style.css') }}" rel="stylesheet">
{% block head %}{% endblock %}
</head>
<body>
<!-- 导航栏 -->
<nav class="navbar navbar-expand-lg navbar-dark bg-primary">
<div class="container">
<a class="navbar-brand" href="{{ url_for('main.index') }}">
<i class="fas fa-blog"></i> Flask博客
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarNav">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarNav">
<ul class="navbar-nav me-auto">
<li class="nav-item">
<a class="nav-link" href="{{ url_for('main.index') }}">首页</a>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="categoriesDropdown" role="button" data-bs-toggle="dropdown">
分类
</a>
<ul class="dropdown-menu">
{% for category in g.categories %}
<li><a class="dropdown-item" href="{{ url_for('main.category_articles', slug=category.slug) }}">{{ category.name }}</a></li>
{% endfor %}
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="{{ url_for('main.about') }}">关于</a>
</li>
<li class="nav-item">
<a class="nav-link" href="{{ url_for('main.contact') }}">联系</a>
</li>
</ul>
<!-- 搜索框 -->
<form class="d-flex me-3" method="GET" action="{{ url_for('main.search') }}">
<input class="form-control me-2" type="search" name="q" placeholder="搜索文章..." value="{{ request.args.get('q', '') }}">
<button class="btn btn-outline-light" type="submit">
<i class="fas fa-search"></i>
</button>
</form>
<!-- 用户菜单 -->
<ul class="navbar-nav">
{% if current_user.is_authenticated %}
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" id="userDropdown" role="button" data-bs-toggle="dropdown">
<i class="fas fa-user"></i> {{ current_user.username }}
</a>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="{{ url_for('auth.profile') }}">个人资料</a></li>
<li><a class="dropdown-item" href="{{ url_for('article.my_articles') }}">我的文章</a></li>
<li><a class="dropdown-item" href="{{ url_for('article.create') }}">写文章</a></li>
<li><hr class="dropdown-divider"></li>
<li><a class="dropdown-item" href="{{ url_for('auth.logout') }}">登出</a></li>
</ul>
</li>
{% else %}
<li class="nav-item">
<a class="nav-link" href="{{ url_for('auth.login') }}">登录</a>
</li>
<li class="nav-item">
<a class="nav-link" href="{{ url_for('auth.register') }}">注册</a>
</li>
{% endif %}
</ul>
</div>
</div>
</nav>
<!-- 消息提示 -->
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
<div class="container mt-3">
{% for category, message in messages %}
<div class="alert alert-{{ 'danger' if category == 'error' else category }} alert-dismissible fade show" role="alert">
{{ message }}
<button type="button" class="btn-close" data-bs-dismiss="alert"></button>
</div>
{% endfor %}
</div>
{% endif %}
{% endwith %}
<!-- 主要内容 -->
<main class="container my-4">
{% block content %}{% endblock %}
</main>
<!-- 页脚 -->
<footer class="bg-dark text-light py-4 mt-5">
<div class="container">
<div class="row">
<div class="col-md-6">
<h5>Flask博客系统</h5>
<p>基于Flask构建的现代化博客平台</p>
</div>
<div class="col-md-6">
<h5>快速链接</h5>
<ul class="list-unstyled">
<li><a href="{{ url_for('main.about') }}" class="text-light">关于我们</a></li>
<li><a href="{{ url_for('main.contact') }}" class="text-light">联系我们</a></li>
<li><a href="#" class="text-light">隐私政策</a></li>
</ul>
</div>
</div>
<hr>
<div class="text-center">
<p>© 2024 Flask博客系统. 保留所有权利.</p>
</div>
</div>
</footer>
<!-- JavaScript -->
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/js/bootstrap.bundle.min.js"></script>
<script src="https://unpkg.com/htmx.org@1.8.4"></script>
<script src="{{ url_for('static', filename='js/app.js') }}"></script>
{% block scripts %}{% endblock %}
</body>
</html>
12.6.2 首页模板
<!-- app/templates/main/index.html -->
{% extends "base.html" %}
{% block title %}首页 - Flask博客系统{% endblock %}
{% block content %}
<div class="row">
<!-- 主要内容区域 -->
<div class="col-lg-8">
<h1 class="mb-4">最新文章</h1>
{% if pagination.items %}
{% for article in pagination.items %}
<article class="card mb-4">
{% if article.featured_image %}
<img src="{{ article.featured_image }}" class="card-img-top" alt="{{ article.title }}">
{% endif %}
<div class="card-body">
<h2 class="card-title">
<a href="{{ url_for('main.article_detail', slug=article.slug) }}" class="text-decoration-none">
{{ article.title }}
</a>
{% if article.is_featured %}
<span class="badge bg-warning text-dark ms-2">推荐</span>
{% endif %}
</h2>
<div class="card-text text-muted mb-2">
<small>
<i class="fas fa-user"></i> {{ article.author.username }}
<i class="fas fa-calendar ms-2"></i> {{ article.published_at.strftime('%Y-%m-%d') }}
<i class="fas fa-eye ms-2"></i> {{ article.view_count }}
{% if article.category %}
<i class="fas fa-folder ms-2"></i>
<a href="{{ url_for('main.category_articles', slug=article.category.slug) }}" class="text-muted">
{{ article.category.name }}
</a>
{% endif %}
</small>
</div>
<p class="card-text">{{ article.get_summary() }}</p>
<div class="d-flex justify-content-between align-items-center">
<div>
{% for tag in article.tags[:3] %}
<a href="{{ url_for('main.tag_articles', slug=tag.slug) }}" class="badge bg-secondary text-decoration-none me-1">
{{ tag.name }}
</a>
{% endfor %}
</div>
<a href="{{ url_for('main.article_detail', slug=article.slug) }}" class="btn btn-primary btn-sm">
阅读更多 <i class="fas fa-arrow-right"></i>
</a>
</div>
</div>
</article>
{% endfor %}
<!-- 分页 -->
{% if pagination.pages > 1 %}
<nav aria-label="文章分页">
<ul class="pagination justify-content-center">
{% if pagination.has_prev %}
<li class="page-item">
<a class="page-link" href="{{ url_for('main.index', page=pagination.prev_num) }}">
<i class="fas fa-chevron-left"></i> 上一页
</a>
</li>
{% endif %}
{% for page_num in pagination.iter_pages() %}
{% if page_num %}
{% if page_num != pagination.page %}
<li class="page-item">
<a class="page-link" href="{{ url_for('main.index', page=page_num) }}">{{ page_num }}</a>
</li>
{% else %}
<li class="page-item active">
<span class="page-link">{{ page_num }}</span>
</li>
{% endif %}
{% else %}
<li class="page-item disabled">
<span class="page-link">...</span>
</li>
{% endif %}
{% endfor %}
{% if pagination.has_next %}
<li class="page-item">
<a class="page-link" href="{{ url_for('main.index', page=pagination.next_num) }}">
下一页 <i class="fas fa-chevron-right"></i>
</a>
</li>
{% endif %}
</ul>
</nav>
{% endif %}
{% else %}
<div class="text-center py-5">
<i class="fas fa-file-alt fa-3x text-muted mb-3"></i>
<h3 class="text-muted">暂无文章</h3>
<p class="text-muted">还没有发布任何文章</p>
</div>
{% endif %}
</div>
<!-- 侧边栏 -->
<div class="col-lg-4">
<!-- 热门文章 -->
{% if popular_articles %}
<div class="card mb-4">
<div class="card-header">
<h5 class="mb-0"><i class="fas fa-fire"></i> 热门文章</h5>
</div>
<div class="card-body">
{% for article in popular_articles %}
<div class="d-flex mb-3">
<div class="flex-shrink-0">
<span class="badge bg-primary rounded-pill">{{ loop.index }}</span>
</div>
<div class="flex-grow-1 ms-3">
<h6 class="mb-1">
<a href="{{ url_for('main.article_detail', slug=article.slug) }}" class="text-decoration-none">
{{ article.title[:50] }}{% if article.title|length > 50 %}...{% endif %}
</a>
</h6>
<small class="text-muted">
<i class="fas fa-eye"></i> {{ article.view_count }}
</small>
</div>
</div>
{% endfor %}
</div>
</div>
{% endif %}
<!-- 分类 -->
{% if categories %}
<div class="card mb-4">
<div class="card-header">
<h5 class="mb-0"><i class="fas fa-folder"></i> 分类</h5>
</div>
<div class="card-body">
{% for category in categories %}
<a href="{{ url_for('main.category_articles', slug=category.slug) }}" class="btn btn-outline-secondary btn-sm me-2 mb-2">
{{ category.name }} ({{ category.articles.count() }})
</a>
{% endfor %}
</div>
</div>
{% endif %}
<!-- 标签云 -->
{% if tags %}
<div class="card mb-4">
<div class="card-header">
<h5 class="mb-0"><i class="fas fa-tags"></i> 标签</h5>
</div>
<div class="card-body">
{% for tag in tags %}
<a href="{{ url_for('main.tag_articles', slug=tag.slug) }}" class="badge bg-light text-dark text-decoration-none me-1 mb-1">
{{ tag.name }}
</a>
{% endfor %}
</div>
</div>
{% endif %}
</div>
</div>
{% endblock %}
12.6.3 文章详情模板
<!-- app/templates/articles/detail.html -->
{% extends "base.html" %}
{% block title %}{{ article.title }} - Flask博客系统{% endblock %}
{% block head %}
<meta name="description" content="{{ article.get_summary() }}">
<meta property="og:title" content="{{ article.title }}">
<meta property="og:description" content="{{ article.get_summary() }}">
{% if article.featured_image %}
<meta property="og:image" content="{{ article.featured_image }}">
{% endif %}
{% endblock %}
{% block content %}
<div class="row">
<div class="col-lg-8">
<article class="mb-4">
<!-- 文章头部 -->
<header class="mb-4">
<h1 class="display-5 fw-bold">{{ article.title }}</h1>
<div class="text-muted mb-3">
<i class="fas fa-user"></i>
<a href="#" class="text-muted text-decoration-none">{{ article.author.get_full_name() or article.author.username }}</a>
<i class="fas fa-calendar ms-3"></i> {{ article.published_at.strftime('%Y年%m月%d日') }}
<i class="fas fa-clock ms-3"></i> 约{{ article.get_reading_time() }}分钟阅读
<i class="fas fa-eye ms-3"></i> {{ article.view_count }}次浏览
</div>
{% if article.category %}
<div class="mb-3">
<i class="fas fa-folder"></i>
<a href="{{ url_for('main.category_articles', slug=article.category.slug) }}" class="badge bg-primary text-decoration-none">
{{ article.category.name }}
</a>
</div>
{% endif %}
{% if article.tags %}
<div class="mb-3">
<i class="fas fa-tags"></i>
{% for tag in article.tags %}
<a href="{{ url_for('main.tag_articles', slug=tag.slug) }}" class="badge bg-secondary text-decoration-none me-1">
{{ tag.name }}
</a>
{% endfor %}
</div>
{% endif %}
</header>
<!-- 特色图片 -->
{% if article.featured_image %}
<div class="mb-4">
<img src="{{ article.featured_image }}" class="img-fluid rounded" alt="{{ article.title }}">
</div>
{% endif %}
<!-- 文章内容 -->
<div class="article-content">
{{ article.content|safe }}
</div>
<!-- 文章操作 -->
{% if current_user.is_authenticated and (current_user == article.author or current_user.is_admin) %}
<div class="mt-4 pt-4 border-top">
<a href="{{ url_for('article.edit', slug=article.slug) }}" class="btn btn-outline-primary btn-sm">
<i class="fas fa-edit"></i> 编辑
</a>
<button type="button" class="btn btn-outline-danger btn-sm" data-bs-toggle="modal" data-bs-target="#deleteModal">
<i class="fas fa-trash"></i> 删除
</button>
</div>
<!-- 删除确认模态框 -->
<div class="modal fade" id="deleteModal" tabindex="-1">
<div class="modal-dialog">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">确认删除</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal"></button>
</div>
<div class="modal-body">
确定要删除文章《{{ article.title }}》吗?此操作不可撤销。
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">取消</button>
<form method="POST" action="{{ url_for('article.delete', slug=article.slug) }}" class="d-inline">
<button type="submit" class="btn btn-danger">确认删除</button>
</form>
</div>
</div>
</div>
</div>
{% endif %}
</article>
<!-- 评论区 -->
{% if article.allow_comments %}
<section class="mt-5">
<h3>评论 ({{ comments|length }})</h3>
<!-- 发表评论 -->
{% if current_user.is_authenticated %}
<div class="card mb-4">
<div class="card-body">
<h5 class="card-title">发表评论</h5>
<form method="POST" action="{{ url_for('article.add_comment', slug=article.slug) }}">
{{ comment_form.hidden_tag() }}
<div class="mb-3">
{{ comment_form.content.label(class="form-label") }}
{{ comment_form.content(class="form-control") }}
{% for error in comment_form.content.errors %}
<div class="text-danger small">{{ error }}</div>
{% endfor %}
</div>
{{ comment_form.submit(class="btn btn-primary") }}
</form>
</div>
</div>
{% else %}
<div class="alert alert-info">
<i class="fas fa-info-circle"></i>
请 <a href="{{ url_for('auth.login') }}">登录</a> 后发表评论。
</div>
{% endif %}
<!-- 评论列表 -->
{% if comments %}
<div class="comments-list">
{% for comment in comments %}
<div class="card mb-3">
<div class="card-body">
<div class="d-flex justify-content-between align-items-start mb-2">
<div>
<strong>{{ comment.author.get_full_name() or comment.author.username }}</strong>
<small class="text-muted ms-2">
<i class="fas fa-clock"></i> {{ comment.created_at.strftime('%Y-%m-%d %H:%M') }}
</small>
</div>
{% if comment.status == 'pending' %}
<span class="badge bg-warning">待审核</span>
{% endif %}
</div>
<p class="mb-0">{{ comment.content }}</p>
</div>
</div>
{% endfor %}
</div>
{% else %}
<div class="text-center py-4">
<i class="fas fa-comments fa-2x text-muted mb-2"></i>
<p class="text-muted">暂无评论,快来发表第一个评论吧!</p>
</div>
{% endif %}
</section>
{% endif %}
</div>
<!-- 侧边栏 -->
<div class="col-lg-4">
<!-- 作者信息 -->
<div class="card mb-4">
<div class="card-header">
<h5 class="mb-0"><i class="fas fa-user"></i> 作者信息</h5>
</div>
<div class="card-body text-center">
<div class="mb-3">
<img src="{{ article.author.avatar or url_for('static', filename='images/default-avatar.png') }}"
class="rounded-circle" width="80" height="80" alt="{{ article.author.username }}">
</div>
<h6>{{ article.author.get_full_name() or article.author.username }}</h6>
{% if article.author.bio %}
<p class="text-muted small">{{ article.author.bio }}</p>
{% endif %}
<div class="text-muted small">
<i class="fas fa-calendar"></i> 加入于 {{ article.author.created_at.strftime('%Y年%m月') }}
</div>
</div>
</div>
<!-- 相关文章 -->
{% if related_articles %}
<div class="card mb-4">
<div class="card-header">
<h5 class="mb-0"><i class="fas fa-newspaper"></i> 相关文章</h5>
</div>
<div class="card-body">
{% for related in related_articles %}
<div class="mb-3">
<h6 class="mb-1">
<a href="{{ url_for('main.article_detail', slug=related.slug) }}" class="text-decoration-none">
{{ related.title[:40] }}{% if related.title|length > 40 %}...{% endif %}
</a>
</h6>
<small class="text-muted">
<i class="fas fa-calendar"></i> {{ related.published_at.strftime('%m-%d') }}
<i class="fas fa-eye ms-2"></i> {{ related.view_count }}
</small>
</div>
{% endfor %}
</div>
</div>
{% endif %}
<!-- 目录 -->
<div class="card mb-4">
<div class="card-header">
<h5 class="mb-0"><i class="fas fa-list"></i> 文章目录</h5>
</div>
<div class="card-body">
<div id="toc"></div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block scripts %}
<script>
// 生成文章目录
document.addEventListener('DOMContentLoaded', function() {
const content = document.querySelector('.article-content');
const toc = document.getElementById('toc');
const headings = content.querySelectorAll('h1, h2, h3, h4, h5, h6');
if (headings.length > 0) {
const tocList = document.createElement('ul');
tocList.className = 'list-unstyled';
headings.forEach((heading, index) => {
const id = `heading-${index}`;
heading.id = id;
const li = document.createElement('li');
li.className = `toc-${heading.tagName.toLowerCase()}`;
const a = document.createElement('a');
a.href = `#${id}`;
a.textContent = heading.textContent;
a.className = 'text-decoration-none';
li.appendChild(a);
tocList.appendChild(li);
});
toc.appendChild(tocList);
} else {
toc.innerHTML = '<p class="text-muted small">本文暂无目录</p>';
}
});
</script>
{% endblock %}
12.7 API接口设计
12.7.1 RESTful API结构
# app/api/__init__.py
from flask import Blueprint
from flask_restful import Api
from app.api.resources.auth import AuthResource, LoginResource, RegisterResource
from app.api.resources.articles import ArticleListResource, ArticleResource
from app.api.resources.comments import CommentListResource, CommentResource
from app.api.resources.users import UserListResource, UserResource
api_bp = Blueprint('api', __name__, url_prefix='/api/v1')
api = Api(api_bp)
# 认证相关
api.add_resource(LoginResource, '/auth/login')
api.add_resource(RegisterResource, '/auth/register')
api.add_resource(AuthResource, '/auth/me')
# 文章相关
api.add_resource(ArticleListResource, '/articles')
api.add_resource(ArticleResource, '/articles/<string:slug>')
# 评论相关
api.add_resource(CommentListResource, '/articles/<string:article_slug>/comments')
api.add_resource(CommentResource, '/comments/<int:comment_id>')
# 用户相关
api.add_resource(UserListResource, '/users')
api.add_resource(UserResource, '/users/<int:user_id>')
12.7.2 文章API资源
# app/api/resources/articles.py
from flask import request, current_app
from flask_restful import Resource
from flask_login import login_required, current_user
from app.services.article_service import ArticleService
from app.api.decorators import api_login_required, api_permission_required
from app.api.schemas import ArticleSchema, ArticleListSchema
from app.utils.response import success_response, error_response
from marshmallow import ValidationError
article_service = ArticleService()
article_schema = ArticleSchema()
article_list_schema = ArticleListSchema()
class ArticleListResource(Resource):
"""文章列表API"""
def get(self):
"""获取文章列表"""
try:
# 获取查询参数
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
category_id = request.args.get('category_id', type=int)
tag_id = request.args.get('tag_id', type=int)
author_id = request.args.get('author_id', type=int)
search = request.args.get('search')
# 获取文章列表
if search:
pagination = article_service.search_articles(
query=search,
page=page,
per_page=per_page
)
else:
pagination = article_service.get_articles(
page=page,
per_page=per_page,
category_id=category_id,
tag_id=tag_id,
author_id=author_id,
published_only=True
)
# 序列化数据
articles_data = article_list_schema.dump(pagination.items, many=True)
return success_response(
data={
'articles': articles_data,
'pagination': {
'page': pagination.page,
'pages': pagination.pages,
'per_page': pagination.per_page,
'total': pagination.total,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
}
}
)
except Exception as e:
return error_response(message=str(e))
@api_login_required
def post(self):
"""创建文章"""
try:
# 验证数据
json_data = request.get_json()
if not json_data:
return error_response(message='请提供有效的JSON数据')
data = article_schema.load(json_data)
# 创建文章
article = article_service.create_article(
author=current_user,
title=data['title'],
content=data['content'],
summary=data.get('summary'),
category_name=data.get('category'),
tag_names=data.get('tags', []),
is_featured=data.get('is_featured', False),
allow_comments=data.get('allow_comments', True)
)
# 处理发布状态
if data.get('publish', False):
article_service.publish_article(article)
return success_response(
data=article_schema.dump(article),
message='文章创建成功',
status_code=201
)
except ValidationError as e:
return error_response(message='数据验证失败', errors=e.messages)
except Exception as e:
return error_response(message=str(e))
class ArticleResource(Resource):
"""单个文章API"""
def get(self, slug):
"""获取文章详情"""
try:
article = article_service.get_article_by_slug(slug, increment_view=True)
if not article:
return error_response(message='文章不存在', status_code=404)
return success_response(data=article_schema.dump(article))
except Exception as e:
return error_response(message=str(e))
@api_login_required
def put(self, slug):
"""更新文章"""
try:
article = article_service.get_article_by_slug(slug)
if not article:
return error_response(message='文章不存在', status_code=404)
# 检查权限
if article.author != current_user and not current_user.is_admin:
return error_response(message='无权限操作', status_code=403)
# 验证数据
json_data = request.get_json()
if not json_data:
return error_response(message='请提供有效的JSON数据')
data = article_schema.load(json_data, partial=True)
# 更新文章
article_service.update_article(
article,
title=data.get('title'),
content=data.get('content'),
summary=data.get('summary'),
category_name=data.get('category'),
tag_names=data.get('tags'),
is_featured=data.get('is_featured'),
allow_comments=data.get('allow_comments')
)
# 处理发布状态
if 'publish' in data:
if data['publish'] and not article.is_published:
article_service.publish_article(article)
elif not data['publish'] and article.is_published:
article_service.unpublish_article(article)
return success_response(
data=article_schema.dump(article),
message='文章更新成功'
)
except ValidationError as e:
return error_response(message='数据验证失败', errors=e.messages)
except Exception as e:
return error_response(message=str(e))
@api_login_required
def delete(self, slug):
"""删除文章"""
try:
article = article_service.get_article_by_slug(slug)
if not article:
return error_response(message='文章不存在', status_code=404)
# 检查权限
if article.author != current_user and not current_user.is_admin:
return error_response(message='无权限操作', status_code=403)
# 删除文章
article_service.delete_article(article)
return success_response(message='文章删除成功')
except Exception as e:
return error_response(message=str(e))
12.7.3 API装饰器
# app/api/decorators.py
from functools import wraps
from flask import request, current_app
from flask_login import current_user
from app.utils.response import error_response
from app.models.user import User
import jwt
def api_login_required(f):
"""API登录验证装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
token = None
# 从请求头获取token
if 'Authorization' in request.headers:
auth_header = request.headers['Authorization']
try:
token = auth_header.split(' ')[1] # Bearer <token>
except IndexError:
return error_response(message='无效的Authorization头格式', status_code=401)
if not token:
return error_response(message='缺少访问令牌', status_code=401)
try:
# 验证token
data = jwt.decode(
token,
current_app.config['SECRET_KEY'],
algorithms=['HS256']
)
# 获取用户
user = User.query.get(data['user_id'])
if not user or not user.is_active:
return error_response(message='用户不存在或已被禁用', status_code=401)
# 设置当前用户
request.current_user = user
except jwt.ExpiredSignatureError:
return error_response(message='访问令牌已过期', status_code=401)
except jwt.InvalidTokenError:
return error_response(message='无效的访问令牌', status_code=401)
return f(*args, **kwargs)
return decorated_function
def api_permission_required(permission):
"""API权限验证装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not hasattr(request, 'current_user'):
return error_response(message='未认证用户', status_code=401)
user = request.current_user
# 检查权限
if not user.has_permission(permission):
return error_response(message='权限不足', status_code=403)
return f(*args, **kwargs)
return decorated_function
return decorator
def rate_limit(max_requests=100, per_seconds=3600):
"""API速率限制装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 这里可以实现基于Redis的速率限制
# 简化实现,实际项目中应该使用Redis
return f(*args, **kwargs)
return decorated_function
return decorator
12.8 配置管理
12.8.1 配置类
# config.py
import os
from datetime import timedelta
class Config:
"""基础配置"""
# 基本配置
SECRET_KEY = os.environ.get('SECRET_KEY') or 'your-secret-key-here'
# 数据库配置
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'postgresql://user:password@localhost/flask_blog'
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 10,
'pool_recycle': 120,
'pool_pre_ping': True
}
# Redis配置
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379/0'
# 邮件配置
MAIL_SERVER = os.environ.get('MAIL_SERVER') or 'smtp.gmail.com'
MAIL_PORT = int(os.environ.get('MAIL_PORT') or 587)
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS', 'true').lower() in ['true', 'on', '1']
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
MAIL_DEFAULT_SENDER = os.environ.get('MAIL_DEFAULT_SENDER')
# 文件上传配置
UPLOAD_FOLDER = os.environ.get('UPLOAD_FOLDER') or 'uploads'
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}
# 分页配置
ARTICLES_PER_PAGE = 10
COMMENTS_PER_PAGE = 20
# 缓存配置
CACHE_TYPE = 'redis'
CACHE_REDIS_URL = REDIS_URL
CACHE_DEFAULT_TIMEOUT = 300
# Celery配置
CELERY_BROKER_URL = REDIS_URL
CELERY_RESULT_BACKEND = REDIS_URL
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = 'Asia/Shanghai'
# JWT配置
JWT_SECRET_KEY = SECRET_KEY
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
# 搜索配置
ELASTICSEARCH_URL = os.environ.get('ELASTICSEARCH_URL') or 'http://localhost:9200'
# 日志配置
LOG_LEVEL = os.environ.get('LOG_LEVEL') or 'INFO'
LOG_FILE = os.environ.get('LOG_FILE') or 'app.log'
# 安全配置
WTF_CSRF_ENABLED = True
WTF_CSRF_TIME_LIMIT = None
# 国际化配置
LANGUAGES = ['zh', 'en']
BABEL_DEFAULT_LOCALE = 'zh'
BABEL_DEFAULT_TIMEZONE = 'Asia/Shanghai'
@staticmethod
def init_app(app):
"""初始化应用配置"""
pass
class DevelopmentConfig(Config):
"""开发环境配置"""
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get('DEV_DATABASE_URL') or \
'postgresql://user:password@localhost/flask_blog_dev'
# 开发环境使用简单缓存
CACHE_TYPE = 'simple'
# 邮件调试
MAIL_SUPPRESS_SEND = False
@classmethod
def init_app(cls, app):
Config.init_app(app)
# 开发环境日志配置
import logging
from logging.handlers import RotatingFileHandler
if not app.debug and not app.testing:
if not os.path.exists('logs'):
os.mkdir('logs')
file_handler = RotatingFileHandler(
'logs/flask_blog.log',
maxBytes=10240000,
backupCount=10
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('Flask Blog startup')
class TestingConfig(Config):
"""测试环境配置"""
TESTING = True
SQLALCHEMY_DATABASE_URI = os.environ.get('TEST_DATABASE_URL') or \
'postgresql://user:password@localhost/flask_blog_test'
# 测试环境禁用CSRF
WTF_CSRF_ENABLED = False
# 使用内存缓存
CACHE_TYPE = 'simple'
# 禁用邮件发送
MAIL_SUPPRESS_SEND = True
class ProductionConfig(Config):
"""生产环境配置"""
DEBUG = False
@classmethod
def init_app(cls, app):
Config.init_app(app)
# 生产环境日志配置
import logging
from logging.handlers import RotatingFileHandler, SMTPHandler
# 文件日志
if not os.path.exists('logs'):
os.mkdir('logs')
file_handler = RotatingFileHandler(
'logs/flask_blog.log',
maxBytes=10240000,
backupCount=10
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
# 邮件日志(错误级别)
if app.config.get('MAIL_SERVER'):
auth = None
if app.config.get('MAIL_USERNAME') or app.config.get('MAIL_PASSWORD'):
auth = (app.config.get('MAIL_USERNAME'), app.config.get('MAIL_PASSWORD'))
secure = None
if app.config.get('MAIL_USE_TLS'):
secure = ()
mail_handler = SMTPHandler(
mailhost=(app.config.get('MAIL_SERVER'), app.config.get('MAIL_PORT')),
fromaddr=app.config.get('MAIL_DEFAULT_SENDER'),
toaddrs=app.config.get('ADMINS', []),
subject='Flask Blog Application Error',
credentials=auth,
secure=secure
)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('Flask Blog startup')
class DockerConfig(ProductionConfig):
"""Docker环境配置"""
@classmethod
def init_app(cls, app):
ProductionConfig.init_app(app)
# Docker环境日志输出到stdout
import logging
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.INFO)
app.logger.addHandler(stream_handler)
# 配置字典
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'docker': DockerConfig,
'default': DevelopmentConfig
}
12.8.2 环境变量管理
# .env.example
# 复制此文件为.env并填入实际值
# 基本配置
SECRET_KEY=your-very-secret-key-here
FLASK_ENV=development
FLASK_APP=app.py
# 数据库配置
DATABASE_URL=postgresql://username:password@localhost:5432/flask_blog
DEV_DATABASE_URL=postgresql://username:password@localhost:5432/flask_blog_dev
TEST_DATABASE_URL=postgresql://username:password@localhost:5432/flask_blog_test
# Redis配置
REDIS_URL=redis://localhost:6379/0
# 邮件配置
MAIL_SERVER=smtp.gmail.com
MAIL_PORT=587
MAIL_USE_TLS=true
MAIL_USERNAME=your-email@gmail.com
MAIL_PASSWORD=your-app-password
MAIL_DEFAULT_SENDER=your-email@gmail.com
# 文件上传配置
UPLOAD_FOLDER=uploads
# 搜索配置
ELASTICSEARCH_URL=http://localhost:9200
# 日志配置
LOG_LEVEL=INFO
LOG_FILE=app.log
# 第三方服务
SENTRY_DSN=your-sentry-dsn
AWS_ACCESS_KEY_ID=your-aws-access-key
AWS_SECRET_ACCESS_KEY=your-aws-secret-key
AWS_S3_BUCKET=your-s3-bucket
# app/utils/env_loader.py
import os
from pathlib import Path
def load_env_file(env_file='.env'):
"""加载环境变量文件"""
env_path = Path(env_file)
if not env_path.exists():
return
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip().strip('"\'')
# 只有当环境变量不存在时才设置
if key not in os.environ:
os.environ[key] = value
class EnvConfig:
"""环境变量配置管理器"""
@staticmethod
def get(key, default=None, cast=str):
"""获取环境变量并转换类型"""
value = os.environ.get(key, default)
if value is None:
return None
if cast == bool:
return value.lower() in ('true', '1', 'yes', 'on')
elif cast == int:
try:
return int(value)
except ValueError:
return default
elif cast == float:
try:
return float(value)
except ValueError:
return default
elif cast == list:
return [item.strip() for item in value.split(',') if item.strip()]
else:
return cast(value)
@staticmethod
def require(key, cast=str):
"""获取必需的环境变量"""
value = EnvConfig.get(key, cast=cast)
if value is None:
raise ValueError(f'Required environment variable {key} is not set')
return value
@staticmethod
def validate():
"""验证必需的环境变量"""
required_vars = [
'SECRET_KEY',
'DATABASE_URL',
]
missing_vars = []
for var in required_vars:
if not os.environ.get(var):
missing_vars.append(var)
if missing_vars:
raise ValueError(f'Missing required environment variables: {", ".join(missing_vars)}')
12.9 应用工厂模式
12.9.1 应用工厂
# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from flask_mail import Mail
from flask_wtf.csrf import CSRFProtect
from flask_caching import Cache
from flask_babel import Babel
from config import config
from app.utils.env_loader import load_env_file, EnvConfig
# 扩展实例
db = SQLAlchemy()
migrate = Migrate()
login_manager = LoginManager()
mail = Mail()
csrf = CSRFProtect()
cache = Cache()
babel = Babel()
def create_app(config_name=None):
"""应用工厂函数"""
# 加载环境变量
load_env_file()
# 创建Flask应用实例
app = Flask(__name__)
# 确定配置
if config_name is None:
config_name = EnvConfig.get('FLASK_ENV', 'development')
# 加载配置
app.config.from_object(config[config_name])
config[config_name].init_app(app)
# 验证环境变量
try:
EnvConfig.validate()
except ValueError as e:
app.logger.error(f'Configuration error: {e}')
raise
# 初始化扩展
init_extensions(app)
# 注册蓝图
register_blueprints(app)
# 注册错误处理器
register_error_handlers(app)
# 注册上下文处理器
register_context_processors(app)
# 注册CLI命令
register_cli_commands(app)
# 配置日志
configure_logging(app)
return app
def init_extensions(app):
"""初始化扩展"""
db.init_app(app)
migrate.init_app(app, db)
# 配置登录管理器
login_manager.init_app(app)
login_manager.login_view = 'auth.login'
login_manager.login_message = '请先登录以访问此页面。'
login_manager.login_message_category = 'info'
@login_manager.user_loader
def load_user(user_id):
from app.models.user import User
return User.query.get(int(user_id))
mail.init_app(app)
csrf.init_app(app)
cache.init_app(app)
babel.init_app(app)
# 初始化搜索
if app.config.get('ELASTICSEARCH_URL'):
from app.services.search_service import SearchService
SearchService.init_app(app)
# 初始化Celery
from app.utils.celery_utils import init_celery
init_celery(app)
def register_blueprints(app):
"""注册蓝图"""
from app.views.main import main_bp
from app.views.auth import auth_bp
from app.views.article import article_bp
from app.api import api_bp
app.register_blueprint(main_bp)
app.register_blueprint(auth_bp, url_prefix='/auth')
app.register_blueprint(article_bp, url_prefix='/article')
app.register_blueprint(api_bp)
def register_error_handlers(app):
"""注册错误处理器"""
from app.utils.error_handlers import register_error_handlers as reg_handlers
reg_handlers(app)
def register_context_processors(app):
"""注册上下文处理器"""
from app.models.category import Category
from app.models.tag import Tag
@app.context_processor
def inject_global_vars():
"""注入全局模板变量"""
return {
'categories': Category.query.all(),
'popular_tags': Tag.query.limit(10).all()
}
def register_cli_commands(app):
"""注册CLI命令"""
from app.cli import register_commands
register_commands(app)
def configure_logging(app):
"""配置日志"""
if not app.debug and not app.testing:
import logging
from logging.handlers import RotatingFileHandler
import os
if not os.path.exists('logs'):
os.mkdir('logs')
file_handler = RotatingFileHandler(
'logs/flask_blog.log',
maxBytes=10240000,
backupCount=10
)
file_handler.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]'
))
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info('Flask Blog startup')
12.9.2 CLI命令
# app/cli.py
import click
from flask import current_app
from flask.cli import with_appcontext
from app import db
from app.models.user import User
from app.models.category import Category
from app.models.tag import Tag
from app.models.article import Article
from app.services.search_service import SearchService
import os
def register_commands(app):
"""注册CLI命令"""
app.cli.add_command(init_db_command)
app.cli.add_command(create_admin_command)
app.cli.add_command(import_data_command)
app.cli.add_command(rebuild_search_index_command)
app.cli.add_command(generate_fake_data_command)
@click.command('init-db')
@with_appcontext
def init_db_command():
"""初始化数据库"""
db.create_all()
click.echo('Initialized the database.')
@click.command('create-admin')
@click.option('--username', prompt=True, help='管理员用户名')
@click.option('--email', prompt=True, help='管理员邮箱')
@click.option('--password', prompt=True, hide_input=True, confirmation_prompt=True, help='管理员密码')
@with_appcontext
def create_admin_command(username, email, password):
"""创建管理员用户"""
if User.query.filter_by(username=username).first():
click.echo(f'User {username} already exists.')
return
if User.query.filter_by(email=email).first():
click.echo(f'Email {email} already exists.')
return
admin = User(
username=username,
email=email,
is_admin=True,
is_active=True,
email_confirmed=True
)
admin.set_password(password)
db.session.add(admin)
db.session.commit()
click.echo(f'Admin user {username} created successfully.')
@click.command('import-data')
@click.option('--file', type=click.Path(exists=True), help='数据文件路径')
@with_appcontext
def import_data_command(file):
"""导入数据"""
if not file:
click.echo('Please specify a data file.')
return
# 这里可以实现数据导入逻辑
click.echo(f'Importing data from {file}...')
click.echo('Data import completed.')
@click.command('rebuild-search-index')
@with_appcontext
def rebuild_search_index_command():
"""重建搜索索引"""
if not current_app.config.get('ELASTICSEARCH_URL'):
click.echo('Elasticsearch is not configured.')
return
search_service = SearchService()
# 删除现有索引
search_service.delete_index('articles')
# 重新创建索引
search_service.create_index('articles')
# 重新索引所有文章
articles = Article.query.filter_by(status='published').all()
for article in articles:
search_service.index_article(article)
click.echo(f'Rebuilt search index for {len(articles)} articles.')
@click.command('generate-fake-data')
@click.option('--users', default=10, help='生成用户数量')
@click.option('--articles', default=50, help='生成文章数量')
@click.option('--comments', default=200, help='生成评论数量')
@with_appcontext
def generate_fake_data_command(users, articles, comments):
"""生成测试数据"""
from app.utils.fake_data import FakeDataGenerator
generator = FakeDataGenerator()
# 生成分类和标签
generator.create_categories()
generator.create_tags()
# 生成用户
generator.create_users(users)
# 生成文章
generator.create_articles(articles)
# 生成评论
generator.create_comments(comments)
click.echo(f'Generated {users} users, {articles} articles, and {comments} comments.')
12.10 性能优化
12.10.1 数据库优化
# app/utils/database_optimizer.py
from sqlalchemy import text
from app import db
from app.models.article import Article
from app.models.user import User
from app.models.comment import Comment
import time
class DatabaseOptimizer:
"""数据库性能优化工具"""
@staticmethod
def analyze_slow_queries():
"""分析慢查询"""
# PostgreSQL慢查询分析
slow_queries = db.session.execute(text("""
SELECT query, mean_time, calls, total_time
FROM pg_stat_statements
WHERE mean_time > 100
ORDER BY mean_time DESC
LIMIT 10
""")).fetchall()
return slow_queries
@staticmethod
def create_indexes():
"""创建优化索引"""
indexes = [
# 文章相关索引
"CREATE INDEX IF NOT EXISTS idx_articles_status_published_at ON articles(status, published_at DESC)",
"CREATE INDEX IF NOT EXISTS idx_articles_author_status ON articles(author_id, status)",
"CREATE INDEX IF NOT EXISTS idx_articles_category_status ON articles(category_id, status)",
"CREATE INDEX IF NOT EXISTS idx_articles_slug ON articles(slug)",
"CREATE INDEX IF NOT EXISTS idx_articles_view_count ON articles(view_count DESC)",
# 用户相关索引
"CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)",
"CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)",
"CREATE INDEX IF NOT EXISTS idx_users_is_active ON users(is_active)",
# 评论相关索引
"CREATE INDEX IF NOT EXISTS idx_comments_article_status ON comments(article_id, status)",
"CREATE INDEX IF NOT EXISTS idx_comments_author_created ON comments(author_id, created_at DESC)",
# 标签关联索引
"CREATE INDEX IF NOT EXISTS idx_article_tags_article ON article_tags(article_id)",
"CREATE INDEX IF NOT EXISTS idx_article_tags_tag ON article_tags(tag_id)",
]
for index_sql in indexes:
try:
db.session.execute(text(index_sql))
db.session.commit()
print(f"Created index: {index_sql}")
except Exception as e:
print(f"Error creating index: {e}")
db.session.rollback()
@staticmethod
def optimize_queries():
"""查询优化示例"""
# 使用连接查询替代N+1查询
def get_articles_with_authors():
return Article.query.join(User).options(
db.joinedload(Article.author),
db.joinedload(Article.category),
db.joinedload(Article.tags)
).filter(Article.status == 'published').all()
# 使用子查询优化
def get_popular_articles():
return Article.query.filter(
Article.status == 'published'
).order_by(
Article.view_count.desc()
).limit(10).all()
# 使用聚合查询
def get_article_stats():
return db.session.query(
Article.author_id,
db.func.count(Article.id).label('article_count'),
db.func.sum(Article.view_count).label('total_views')
).filter(
Article.status == 'published'
).group_by(Article.author_id).all()
return {
'articles_with_authors': get_articles_with_authors,
'popular_articles': get_popular_articles,
'article_stats': get_article_stats
}
@staticmethod
def vacuum_analyze():
"""执行数据库维护"""
tables = ['users', 'articles', 'comments', 'categories', 'tags']
for table in tables:
try:
db.session.execute(text(f"VACUUM ANALYZE {table}"))
db.session.commit()
print(f"Vacuumed and analyzed table: {table}")
except Exception as e:
print(f"Error vacuuming table {table}: {e}")
db.session.rollback()
12.10.2 缓存策略
# app/utils/cache_manager.py
from flask import current_app
from app import cache
from functools import wraps
import hashlib
import json
import pickle
from datetime import datetime, timedelta
class CacheManager:
"""缓存管理器"""
# 缓存键前缀
PREFIXES = {
'article': 'article:',
'user': 'user:',
'category': 'category:',
'tag': 'tag:',
'search': 'search:',
'stats': 'stats:'
}
# 缓存过期时间(秒)
TIMEOUTS = {
'short': 300, # 5分钟
'medium': 1800, # 30分钟
'long': 3600, # 1小时
'daily': 86400, # 24小时
}
@classmethod
def make_key(cls, prefix, *args, **kwargs):
"""生成缓存键"""
key_data = {
'args': args,
'kwargs': sorted(kwargs.items())
}
key_str = json.dumps(key_data, sort_keys=True)
key_hash = hashlib.md5(key_str.encode()).hexdigest()
return f"{cls.PREFIXES.get(prefix, '')}{key_hash}"
@classmethod
def get(cls, key):
"""获取缓存"""
try:
return cache.get(key)
except Exception as e:
current_app.logger.error(f"Cache get error: {e}")
return None
@classmethod
def set(cls, key, value, timeout=None):
"""设置缓存"""
try:
if timeout is None:
timeout = cls.TIMEOUTS['medium']
return cache.set(key, value, timeout=timeout)
except Exception as e:
current_app.logger.error(f"Cache set error: {e}")
return False
@classmethod
def delete(cls, key):
"""删除缓存"""
try:
return cache.delete(key)
except Exception as e:
current_app.logger.error(f"Cache delete error: {e}")
return False
@classmethod
def delete_pattern(cls, pattern):
"""删除匹配模式的缓存"""
try:
# 这需要Redis支持
from app import redis_client
keys = redis_client.keys(pattern)
if keys:
return redis_client.delete(*keys)
return 0
except Exception as e:
current_app.logger.error(f"Cache delete pattern error: {e}")
return 0
@classmethod
def invalidate_article_cache(cls, article_id):
"""使文章相关缓存失效"""
patterns = [
f"{cls.PREFIXES['article']}*",
f"{cls.PREFIXES['stats']}*",
f"{cls.PREFIXES['search']}*"
]
for pattern in patterns:
cls.delete_pattern(pattern)
def cached(prefix='default', timeout=None, key_func=None):
"""缓存装饰器"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 生成缓存键
if key_func:
cache_key = key_func(*args, **kwargs)
else:
cache_key = CacheManager.make_key(prefix, *args, **kwargs)
# 尝试从缓存获取
result = CacheManager.get(cache_key)
if result is not None:
return result
# 执行函数并缓存结果
result = f(*args, **kwargs)
CacheManager.set(cache_key, result, timeout)
return result
return decorated_function
return decorator
# 使用示例
class ArticleCacheService:
"""文章缓存服务"""
@staticmethod
@cached(prefix='article', timeout=CacheManager.TIMEOUTS['long'])
def get_popular_articles(limit=10):
"""获取热门文章(缓存1小时)"""
from app.models.article import Article
return Article.query.filter_by(status='published').order_by(
Article.view_count.desc()
).limit(limit).all()
@staticmethod
@cached(prefix='article', timeout=CacheManager.TIMEOUTS['medium'])
def get_recent_articles(limit=10):
"""获取最新文章(缓存30分钟)"""
from app.models.article import Article
return Article.query.filter_by(status='published').order_by(
Article.published_at.desc()
).limit(limit).all()
@staticmethod
@cached(prefix='stats', timeout=CacheManager.TIMEOUTS['daily'])
def get_site_stats():
"""获取站点统计(缓存24小时)"""
from app.models.article import Article
from app.models.user import User
from app.models.comment import Comment
return {
'total_articles': Article.query.filter_by(status='published').count(),
'total_users': User.query.filter_by(is_active=True).count(),
'total_comments': Comment.query.filter_by(status='approved').count(),
'total_views': db.session.query(db.func.sum(Article.view_count)).scalar() or 0
}
12.10.3 前端优化
// app/static/js/performance.js
class PerformanceOptimizer {
constructor() {
this.init();
}
init() {
this.setupLazyLoading();
this.setupImageOptimization();
this.setupCacheStrategies();
this.setupPerformanceMonitoring();
}
// 懒加载图片
setupLazyLoading() {
if ('IntersectionObserver' in window) {
const imageObserver = new IntersectionObserver((entries, observer) => {
entries.forEach(entry => {
if (entry.isIntersecting) {
const img = entry.target;
img.src = img.dataset.src;
img.classList.remove('lazy');
observer.unobserve(img);
}
});
});
document.querySelectorAll('img[data-src]').forEach(img => {
imageObserver.observe(img);
});
}
}
// 图片优化
setupImageOptimization() {
// WebP支持检测
const supportsWebP = () => {
const canvas = document.createElement('canvas');
return canvas.toDataURL('image/webp').indexOf('data:image/webp') === 0;
};
if (supportsWebP()) {
document.documentElement.classList.add('webp');
}
// 响应式图片
this.setupResponsiveImages();
}
setupResponsiveImages() {
const images = document.querySelectorAll('img[data-sizes]');
const updateImageSrc = () => {
images.forEach(img => {
const sizes = JSON.parse(img.dataset.sizes);
const windowWidth = window.innerWidth;
let selectedSrc = sizes.default;
for (const [breakpoint, src] of Object.entries(sizes)) {
if (windowWidth >= parseInt(breakpoint)) {
selectedSrc = src;
}
}
if (img.src !== selectedSrc) {
img.src = selectedSrc;
}
});
};
updateImageSrc();
window.addEventListener('resize', this.debounce(updateImageSrc, 250));
}
// 缓存策略
setupCacheStrategies() {
// Service Worker注册
if ('serviceWorker' in navigator) {
navigator.serviceWorker.register('/static/js/sw.js')
.then(registration => {
console.log('SW registered: ', registration);
})
.catch(registrationError => {
console.log('SW registration failed: ', registrationError);
});
}
// 本地存储缓存
this.setupLocalStorageCache();
}
setupLocalStorageCache() {
const cache = {
set(key, data, ttl = 3600000) { // 默认1小时
const item = {
data: data,
timestamp: Date.now(),
ttl: ttl
};
localStorage.setItem(key, JSON.stringify(item));
},
get(key) {
const item = localStorage.getItem(key);
if (!item) return null;
const parsed = JSON.parse(item);
if (Date.now() - parsed.timestamp > parsed.ttl) {
localStorage.removeItem(key);
return null;
}
return parsed.data;
},
remove(key) {
localStorage.removeItem(key);
}
};
window.appCache = cache;
}
// 性能监控
setupPerformanceMonitoring() {
// 页面加载性能
window.addEventListener('load', () => {
setTimeout(() => {
const perfData = performance.getEntriesByType('navigation')[0];
const metrics = {
dns: perfData.domainLookupEnd - perfData.domainLookupStart,
tcp: perfData.connectEnd - perfData.connectStart,
request: perfData.responseStart - perfData.requestStart,
response: perfData.responseEnd - perfData.responseStart,
dom: perfData.domContentLoadedEventEnd - perfData.domContentLoadedEventStart,
load: perfData.loadEventEnd - perfData.loadEventStart,
total: perfData.loadEventEnd - perfData.navigationStart
};
// 发送性能数据到服务器
this.sendPerformanceData(metrics);
}, 0);
});
// Core Web Vitals
this.measureCoreWebVitals();
}
measureCoreWebVitals() {
// Largest Contentful Paint (LCP)
new PerformanceObserver((entryList) => {
const entries = entryList.getEntries();
const lastEntry = entries[entries.length - 1];
console.log('LCP:', lastEntry.startTime);
}).observe({ entryTypes: ['largest-contentful-paint'] });
// First Input Delay (FID)
new PerformanceObserver((entryList) => {
const entries = entryList.getEntries();
entries.forEach(entry => {
console.log('FID:', entry.processingStart - entry.startTime);
});
}).observe({ entryTypes: ['first-input'] });
// Cumulative Layout Shift (CLS)
let clsValue = 0;
new PerformanceObserver((entryList) => {
const entries = entryList.getEntries();
entries.forEach(entry => {
if (!entry.hadRecentInput) {
clsValue += entry.value;
}
});
console.log('CLS:', clsValue);
}).observe({ entryTypes: ['layout-shift'] });
}
sendPerformanceData(metrics) {
// 发送到分析服务
fetch('/api/v1/analytics/performance', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(metrics)
}).catch(err => console.error('Failed to send performance data:', err));
}
// 工具函数
debounce(func, wait) {
let timeout;
return function executedFunction(...args) {
const later = () => {
clearTimeout(timeout);
func(...args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
}
}
// 初始化性能优化器
document.addEventListener('DOMContentLoaded', () => {
new PerformanceOptimizer();
});
12.11 项目总结
12.11.1 技术架构总结
本项目采用了现代化的Flask Web开发架构,主要技术特点包括:
后端架构: - Flask框架:轻量级、灵活的Web框架 - SQLAlchemy ORM:强大的数据库抽象层 - PostgreSQL:可靠的关系型数据库 - Redis:高性能缓存和会话存储 - Celery:异步任务队列 - Elasticsearch:全文搜索引擎
前端技术: - Bootstrap 5:响应式UI框架 - HTMX:现代化的前端交互 - CKEditor:富文本编辑器 - Font Awesome:图标库
开发工具: - Flask-Migrate:数据库迁移 - Flask-Login:用户认证 - Flask-Mail:邮件发送 - Flask-WTF:表单处理和CSRF保护 - Flask-Caching:缓存管理
12.11.2 核心功能实现
用户管理系统: - 用户注册、登录、登出 - 邮箱验证和密码重置 - 用户资料管理 - 权限控制和角色管理
内容管理系统: - 文章的创建、编辑、发布 - 分类和标签管理 - 富文本编辑支持 - 文章状态管理(草稿、发布、下线)
评论系统: - 评论发表和管理 - 评论审核机制 - 垃圾评论过滤
搜索功能: - 基于Elasticsearch的全文搜索 - 搜索结果高亮 - 搜索建议和自动完成
性能优化: - 多层缓存策略 - 数据库查询优化 - 静态资源优化 - 异步任务处理
12.11.3 最佳实践应用
代码组织: - 蓝图模式组织路由 - 服务层分离业务逻辑 - 模型层封装数据操作 - 工具类提供通用功能
安全措施: - CSRF保护 - SQL注入防护 - XSS攻击防护 - 密码安全存储 - 权限验证和授权
测试策略: - 单元测试覆盖核心功能 - 集成测试验证业务流程 - 性能测试确保系统稳定 - 安全测试发现潜在漏洞
部署运维: - Docker容器化部署 - 环境配置管理 - 日志监控和告警 - 自动化部署流程
12.11.4 扩展建议
功能扩展: 1. 社交功能:用户关注、点赞、分享 2. 多媒体支持:图片、视频、音频上传 3. 移动端适配:响应式设计优化 4. 国际化支持:多语言界面 5. API扩展:GraphQL支持
性能优化: 1. CDN集成:静态资源加速 2. 数据库分片:大数据量处理 3. 微服务架构:服务拆分和治理 4. 消息队列:异步处理优化
运维增强: 1. 监控告警:完善的监控体系 2. 自动扩缩容:基于负载的自动调整 3. 灾备方案:数据备份和恢复 4. 安全加固:安全扫描和防护
12.11.5 学习收获
通过本项目的实战开发,我们掌握了:
- Flask框架的深度应用:从基础路由到高级特性
- 数据库设计和优化:关系设计、索引优化、查询调优
- 前后端分离开发:API设计、前端交互、状态管理
- 系统架构设计:分层架构、服务化、可扩展性
- 性能优化技巧:缓存策略、数据库优化、前端优化
- 安全防护措施:常见攻击防护、权限控制、数据保护
- 测试驱动开发:单元测试、集成测试、自动化测试
- 部署运维实践:容器化、监控、日志、备份
这个项目展示了现代Web应用开发的完整流程,从需求分析到架构设计,从编码实现到测试部署,涵盖了Web开发的各个方面。通过实际动手实践,能够深入理解Flask框架的精髓,掌握Web开发的核心技能。
练习题
基础练习
用户系统扩展:
- 添加用户头像上传功能
- 实现用户个人主页
- 添加用户关注功能
文章功能增强:
- 实现文章草稿自动保存
- 添加文章阅读进度条
- 实现文章收藏功能
评论系统优化:
- 实现评论回复功能
- 添加评论点赞功能
- 实现评论举报机制
进阶练习
搜索功能完善:
- 实现搜索历史记录
- 添加高级搜索选项
- 实现搜索结果排序
性能优化实践:
- 实现页面静态化
- 添加图片懒加载
- 优化数据库查询
API接口扩展:
- 实现GraphQL接口
- 添加API版本控制
- 实现API限流机制
高级练习
微服务改造:
- 将用户服务独立出来
- 实现服务间通信
- 添加服务发现机制
实时功能开发:
- 实现实时评论推送
- 添加在线用户统计
- 实现实时通知系统
数据分析功能:
- 实现用户行为分析
- 添加文章热度算法
- 实现推荐系统
这些练习题从基础到高级,涵盖了Web开发的各个方面,通过完成这些练习,可以进一步提升Flask开发技能和系统设计能力。
12.12 项目部署脚本
12.12.1 自动化部署脚本
#!/bin/bash
# deploy.sh - 自动化部署脚本
set -e # 遇到错误立即退出
# 配置变量
APP_NAME="flask-blog"
APP_DIR="/var/www/flask-blog"
GIT_REPO="https://github.com/your-username/flask-blog.git"
BRANCH="main"
PYTHON_VERSION="3.9"
VENV_DIR="$APP_DIR/venv"
SERVICE_NAME="flask-blog"
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# 日志函数
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检查是否为root用户
check_root() {
if [[ $EUID -eq 0 ]]; then
log_error "请不要使用root用户运行此脚本"
exit 1
fi
}
# 检查系统依赖
check_dependencies() {
log_info "检查系统依赖..."
# 检查Python
if ! command -v python3 &> /dev/null; then
log_error "Python3 未安装"
exit 1
fi
# 检查Git
if ! command -v git &> /dev/null; then
log_error "Git 未安装"
exit 1
fi
# 检查PostgreSQL
if ! command -v psql &> /dev/null; then
log_warn "PostgreSQL 客户端未安装,请确保数据库可访问"
fi
# 检查Redis
if ! command -v redis-cli &> /dev/null; then
log_warn "Redis 客户端未安装,请确保Redis服务可访问"
fi
log_info "依赖检查完成"
}
# 创建应用目录
setup_directory() {
log_info "设置应用目录..."
if [ ! -d "$APP_DIR" ]; then
sudo mkdir -p "$APP_DIR"
sudo chown $USER:$USER "$APP_DIR"
fi
cd "$APP_DIR"
}
# 克隆或更新代码
update_code() {
log_info "更新应用代码..."
if [ -d ".git" ]; then
log_info "更新现有代码库..."
git fetch origin
git reset --hard origin/$BRANCH
else
log_info "克隆代码库..."
git clone -b $BRANCH $GIT_REPO .
fi
}
# 设置Python虚拟环境
setup_virtualenv() {
log_info "设置Python虚拟环境..."
if [ ! -d "$VENV_DIR" ]; then
python3 -m venv "$VENV_DIR"
fi
source "$VENV_DIR/bin/activate"
pip install --upgrade pip
pip install -r requirements.txt
}
# 配置环境变量
setup_environment() {
log_info "配置环境变量..."
if [ ! -f ".env" ]; then
if [ -f ".env.example" ]; then
cp .env.example .env
log_warn "请编辑 .env 文件配置实际的环境变量"
else
log_error ".env.example 文件不存在"
exit 1
fi
fi
}
# 数据库迁移
run_migrations() {
log_info "运行数据库迁移..."
source "$VENV_DIR/bin/activate"
# 初始化迁移(如果需要)
if [ ! -d "migrations" ]; then
flask db init
fi
# 生成迁移文件
flask db migrate -m "Auto migration $(date '+%Y%m%d_%H%M%S')"
# 应用迁移
flask db upgrade
}
# 收集静态文件
collect_static() {
log_info "收集静态文件..."
# 如果使用CDN或静态文件服务器,在这里处理
# 例如:压缩CSS/JS文件,上传到CDN等
if [ -d "app/static" ]; then
# 压缩静态文件
find app/static -name "*.css" -exec gzip -k {} \;
find app/static -name "*.js" -exec gzip -k {} \;
fi
}
# 创建systemd服务
setup_systemd_service() {
log_info "设置systemd服务..."
cat > /tmp/$SERVICE_NAME.service << EOF
[Unit]
Description=Flask Blog Application
After=network.target
[Service]
Type=exec
User=$USER
Group=$USER
WorkingDirectory=$APP_DIR
Environment=PATH=$VENV_DIR/bin
EnvironmentFile=$APP_DIR/.env
ExecStart=$VENV_DIR/bin/gunicorn --bind 127.0.0.1:5000 --workers 4 --timeout 120 app:app
ExecReload=/bin/kill -s HUP \$MAINPID
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF
sudo mv /tmp/$SERVICE_NAME.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable $SERVICE_NAME
}
# 配置Nginx
setup_nginx() {
log_info "配置Nginx..."
cat > /tmp/$APP_NAME.nginx << EOF
server {
listen 80;
server_name your-domain.com www.your-domain.com;
# 重定向到HTTPS
return 301 https://\$server_name\$request_uri;
}
server {
listen 443 ssl http2;
server_name your-domain.com www.your-domain.com;
# SSL配置
ssl_certificate /etc/letsencrypt/live/your-domain.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/your-domain.com/privkey.pem;
# 安全头
add_header X-Frame-Options DENY;
add_header X-Content-Type-Options nosniff;
add_header X-XSS-Protection "1; mode=block";
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# 静态文件
location /static {
alias $APP_DIR/app/static;
expires 1y;
add_header Cache-Control "public, immutable";
# Gzip压缩
gzip_static on;
}
# 上传文件
location /uploads {
alias $APP_DIR/uploads;
expires 1y;
add_header Cache-Control "public";
}
# 应用代理
location / {
proxy_pass http://127.0.0.1:5000;
proxy_set_header Host \$host;
proxy_set_header X-Real-IP \$remote_addr;
proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto \$scheme;
# 超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
}
# 健康检查
location /health {
proxy_pass http://127.0.0.1:5000/health;
access_log off;
}
}
EOF
sudo mv /tmp/$APP_NAME.nginx /etc/nginx/sites-available/$APP_NAME
sudo ln -sf /etc/nginx/sites-available/$APP_NAME /etc/nginx/sites-enabled/
sudo nginx -t && sudo systemctl reload nginx
}
# 启动服务
start_services() {
log_info "启动服务..."
# 启动应用服务
sudo systemctl start $SERVICE_NAME
sudo systemctl status $SERVICE_NAME
# 检查服务状态
if systemctl is-active --quiet $SERVICE_NAME; then
log_info "应用服务启动成功"
else
log_error "应用服务启动失败"
exit 1
fi
}
# 运行测试
run_tests() {
log_info "运行测试..."
source "$VENV_DIR/bin/activate"
# 运行单元测试
python -m pytest tests/ -v
# 运行健康检查
curl -f http://localhost:5000/health || {
log_error "健康检查失败"
exit 1
}
}
# 清理函数
cleanup() {
log_info "清理临时文件..."
# 清理旧的日志文件
find logs/ -name "*.log" -mtime +30 -delete 2>/dev/null || true
# 清理Python缓存
find . -name "__pycache__" -type d -exec rm -rf {} + 2>/dev/null || true
find . -name "*.pyc" -delete 2>/dev/null || true
}
# 主函数
main() {
log_info "开始部署 $APP_NAME..."
check_root
check_dependencies
setup_directory
update_code
setup_virtualenv
setup_environment
run_migrations
collect_static
setup_systemd_service
setup_nginx
start_services
run_tests
cleanup
log_info "部署完成!"
log_info "应用已在 https://your-domain.com 上运行"
}
# 如果直接运行脚本
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
main "$@"
fi
12.12.2 Docker部署脚本
#!/bin/bash
# docker-deploy.sh - Docker部署脚本
set -e
# 配置变量
APP_NAME="flask-blog"
IMAGE_NAME="flask-blog:latest"
CONTAINER_NAME="flask-blog-app"
NETWORK_NAME="flask-blog-network"
DB_CONTAINER="flask-blog-db"
REDIS_CONTAINER="flask-blog-redis"
# 颜色输出
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
log_info() {
echo -e "${GREEN}[INFO]${NC} $1"
}
log_warn() {
echo -e "${YELLOW}[WARN]${NC} $1"
}
log_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 检查Docker
check_docker() {
if ! command -v docker &> /dev/null; then
log_error "Docker 未安装"
exit 1
fi
if ! command -v docker-compose &> /dev/null; then
log_error "Docker Compose 未安装"
exit 1
fi
}
# 构建镜像
build_image() {
log_info "构建Docker镜像..."
docker build -t $IMAGE_NAME .
# 清理悬空镜像
docker image prune -f
}
# 创建网络
setup_network() {
log_info "设置Docker网络..."
if ! docker network ls | grep -q $NETWORK_NAME; then
docker network create $NETWORK_NAME
fi
}
# 启动数据库
start_database() {
log_info "启动PostgreSQL数据库..."
if ! docker ps | grep -q $DB_CONTAINER; then
docker run -d \
--name $DB_CONTAINER \
--network $NETWORK_NAME \
-e POSTGRES_DB=flask_blog \
-e POSTGRES_USER=flask_user \
-e POSTGRES_PASSWORD=flask_password \
-v postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
postgres:13
fi
# 等待数据库启动
log_info "等待数据库启动..."
sleep 10
}
# 启动Redis
start_redis() {
log_info "启动Redis..."
if ! docker ps | grep -q $REDIS_CONTAINER; then
docker run -d \
--name $REDIS_CONTAINER \
--network $NETWORK_NAME \
-v redis_data:/data \
-p 6379:6379 \
redis:6-alpine
fi
}
# 运行数据库迁移
run_migrations() {
log_info "运行数据库迁移..."
docker run --rm \
--network $NETWORK_NAME \
-e DATABASE_URL="postgresql://flask_user:flask_password@$DB_CONTAINER:5432/flask_blog" \
-e FLASK_APP=app.py \
$IMAGE_NAME \
flask db upgrade
}
# 启动应用
start_app() {
log_info "启动应用容器..."
# 停止现有容器
docker stop $CONTAINER_NAME 2>/dev/null || true
docker rm $CONTAINER_NAME 2>/dev/null || true
# 启动新容器
docker run -d \
--name $CONTAINER_NAME \
--network $NETWORK_NAME \
-p 5000:5000 \
-e DATABASE_URL="postgresql://flask_user:flask_password@$DB_CONTAINER:5432/flask_blog" \
-e REDIS_URL="redis://$REDIS_CONTAINER:6379/0" \
-e FLASK_ENV=production \
-v $(pwd)/uploads:/app/uploads \
--restart unless-stopped \
$IMAGE_NAME
}
# 健康检查
health_check() {
log_info "执行健康检查..."
# 等待应用启动
sleep 10
# 检查应用健康状态
if curl -f http://localhost:5000/health; then
log_info "应用健康检查通过"
else
log_error "应用健康检查失败"
docker logs $CONTAINER_NAME
exit 1
fi
}
# 使用Docker Compose部署
deploy_with_compose() {
log_info "使用Docker Compose部署..."
# 创建docker-compose.yml
cat > docker-compose.yml << EOF
version: '3.8'
services:
app:
build: .
ports:
- "5000:5000"
environment:
- DATABASE_URL=postgresql://flask_user:flask_password@db:5432/flask_blog
- REDIS_URL=redis://redis:6379/0
- FLASK_ENV=production
volumes:
- ./uploads:/app/uploads
depends_on:
- db
- redis
restart: unless-stopped
db:
image: postgres:13
environment:
- POSTGRES_DB=flask_blog
- POSTGRES_USER=flask_user
- POSTGRES_PASSWORD=flask_password
volumes:
- postgres_data:/var/lib/postgresql/data
restart: unless-stopped
redis:
image: redis:6-alpine
volumes:
- redis_data:/data
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- app
restart: unless-stopped
volumes:
postgres_data:
redis_data:
EOF
# 启动服务
docker-compose up -d
# 运行迁移
docker-compose exec app flask db upgrade
}
# 主函数
main() {
case "${1:-compose}" in
"manual")
log_info "手动Docker部署模式"
check_docker
build_image
setup_network
start_database
start_redis
run_migrations
start_app
health_check
;;
"compose")
log_info "Docker Compose部署模式"
check_docker
deploy_with_compose
;;
*)
log_error "未知部署模式: $1"
log_info "使用方式: $0 [manual|compose]"
exit 1
;;
esac
log_info "Docker部署完成!"
}
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
main "$@"
fi
12.13 测试用例
12.13.1 单元测试
# tests/test_models.py
import pytest
from datetime import datetime
from app import create_app, db
from app.models.user import User
from app.models.article import Article
from app.models.category import Category
from app.models.comment import Comment
@pytest.fixture
def app():
"""创建测试应用"""
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.drop_all()
@pytest.fixture
def client(app):
"""创建测试客户端"""
return app.test_client()
@pytest.fixture
def user(app):
"""创建测试用户"""
user = User(
username='testuser',
email='test@example.com',
is_active=True
)
user.set_password('password123')
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def category(app):
"""创建测试分类"""
category = Category(name='测试分类', slug='test-category')
db.session.add(category)
db.session.commit()
return category
class TestUser:
"""用户模型测试"""
def test_password_hashing(self, app, user):
"""测试密码哈希"""
assert user.password_hash != 'password123'
assert user.check_password('password123')
assert not user.check_password('wrongpassword')
def test_email_confirmation_token(self, app, user):
"""测试邮箱确认令牌"""
token = user.generate_confirmation_token()
assert user.confirm_email(token)
assert user.email_confirmed
def test_password_reset_token(self, app, user):
"""测试密码重置令牌"""
token = user.generate_reset_token()
assert user.reset_password(token, 'newpassword123')
assert user.check_password('newpassword123')
def test_user_repr(self, app, user):
"""测试用户字符串表示"""
assert repr(user) == '<User testuser>'
def test_user_to_dict(self, app, user):
"""测试用户转字典"""
user_dict = user.to_dict()
assert user_dict['username'] == 'testuser'
assert user_dict['email'] == 'test@example.com'
assert 'password_hash' not in user_dict
class TestArticle:
"""文章模型测试"""
def test_article_creation(self, app, user, category):
"""测试文章创建"""
article = Article(
title='测试文章',
content='这是测试内容',
author=user,
category=category
)
db.session.add(article)
db.session.commit()
assert article.slug == 'ce-shi-wen-zhang'
assert article.status == 'draft'
assert article.author_id == user.id
assert article.category_id == category.id
def test_article_publish(self, app, user, category):
"""测试文章发布"""
article = Article(
title='测试文章',
content='这是测试内容',
author=user,
category=category
)
db.session.add(article)
db.session.commit()
article.publish()
assert article.status == 'published'
assert article.published_at is not None
def test_article_reading_time(self, app, user, category):
"""测试阅读时间计算"""
content = ' '.join(['word'] * 300) # 300个单词
article = Article(
title='测试文章',
content=content,
author=user,
category=category
)
reading_time = article.get_reading_time()
assert reading_time >= 1 # 至少1分钟
def test_article_summary(self, app, user, category):
"""测试文章摘要"""
long_content = 'A' * 200
article = Article(
title='测试文章',
content=long_content,
author=user,
category=category
)
summary = article.get_summary(100)
assert len(summary) <= 103 # 100 + '...'
assert summary.endswith('...')
class TestComment:
"""评论模型测试"""
def test_comment_creation(self, app, user, category):
"""测试评论创建"""
article = Article(
title='测试文章',
content='这是测试内容',
author=user,
category=category
)
db.session.add(article)
db.session.commit()
comment = Comment(
content='这是测试评论',
author=user,
article=article
)
db.session.add(comment)
db.session.commit()
assert comment.status == 'pending'
assert comment.author_id == user.id
assert comment.article_id == article.id
def test_comment_approval(self, app, user, category):
"""测试评论审核"""
article = Article(
title='测试文章',
content='这是测试内容',
author=user,
category=category
)
db.session.add(article)
db.session.commit()
comment = Comment(
content='这是测试评论',
author=user,
article=article
)
db.session.add(comment)
db.session.commit()
comment.approve()
assert comment.status == 'approved'
comment.mark_as_spam()
assert comment.status == 'spam'
12.13.2 集成测试
# tests/test_views.py
import pytest
import json
from flask import url_for
from app import create_app, db
from app.models.user import User
from app.models.article import Article
from app.models.category import Category
@pytest.fixture
def app():
"""创建测试应用"""
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.drop_all()
@pytest.fixture
def client(app):
"""创建测试客户端"""
return app.test_client()
@pytest.fixture
def auth_user(app):
"""创建认证用户"""
user = User(
username='testuser',
email='test@example.com',
is_active=True,
email_confirmed=True
)
user.set_password('password123')
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def admin_user(app):
"""创建管理员用户"""
user = User(
username='admin',
email='admin@example.com',
is_active=True,
is_admin=True,
email_confirmed=True
)
user.set_password('admin123')
db.session.add(user)
db.session.commit()
return user
class TestAuthViews:
"""认证视图测试"""
def test_register_get(self, client):
"""测试注册页面GET请求"""
response = client.get('/auth/register')
assert response.status_code == 200
assert b'\xe6\xb3\xa8\xe5\x86\x8c' in response.data # '注册'
def test_register_post_valid(self, client):
"""测试有效注册POST请求"""
data = {
'username': 'newuser',
'email': 'newuser@example.com',
'password': 'password123',
'password2': 'password123'
}
response = client.post('/auth/register', data=data, follow_redirects=True)
assert response.status_code == 200
# 检查用户是否创建
user = User.query.filter_by(username='newuser').first()
assert user is not None
assert user.email == 'newuser@example.com'
def test_register_post_invalid(self, client):
"""测试无效注册POST请求"""
data = {
'username': 'newuser',
'email': 'invalid-email',
'password': 'password123',
'password2': 'different-password'
}
response = client.post('/auth/register', data=data)
assert response.status_code == 200
# 检查用户未创建
user = User.query.filter_by(username='newuser').first()
assert user is None
def test_login_get(self, client):
"""测试登录页面GET请求"""
response = client.get('/auth/login')
assert response.status_code == 200
assert b'\xe7\x99\xbb\xe5\xbd\x95' in response.data # '登录'
def test_login_post_valid(self, client, auth_user):
"""测试有效登录POST请求"""
data = {
'email': 'test@example.com',
'password': 'password123'
}
response = client.post('/auth/login', data=data, follow_redirects=True)
assert response.status_code == 200
def test_login_post_invalid(self, client, auth_user):
"""测试无效登录POST请求"""
data = {
'email': 'test@example.com',
'password': 'wrongpassword'
}
response = client.post('/auth/login', data=data)
assert response.status_code == 200
# 应该显示错误消息
def test_logout(self, client, auth_user):
"""测试登出"""
# 先登录
client.post('/auth/login', data={
'email': 'test@example.com',
'password': 'password123'
})
# 然后登出
response = client.get('/auth/logout', follow_redirects=True)
assert response.status_code == 200
class TestMainViews:
"""主要视图测试"""
def test_index(self, client):
"""测试首页"""
response = client.get('/')
assert response.status_code == 200
def test_article_detail(self, client, auth_user):
"""测试文章详情页"""
# 创建分类和文章
category = Category(name='测试分类', slug='test')
db.session.add(category)
db.session.commit()
article = Article(
title='测试文章',
content='测试内容',
author=auth_user,
category=category,
status='published'
)
article.publish()
db.session.add(article)
db.session.commit()
response = client.get(f'/article/{article.id}')
assert response.status_code == 200
assert b'\xe6\xb5\x8b\xe8\xaf\x95\xe6\x96\x87\xe7\xab\xa0' in response.data # '测试文章'
def test_search(self, client):
"""测试搜索功能"""
response = client.get('/search?q=test')
assert response.status_code == 200
class TestAPIViews:
"""API视图测试"""
def test_api_articles_get(self, client, auth_user):
"""测试获取文章列表API"""
response = client.get('/api/v1/articles')
assert response.status_code == 200
data = json.loads(response.data)
assert 'articles' in data
assert 'pagination' in data
def test_api_articles_post_unauthorized(self, client):
"""测试未授权创建文章API"""
data = {
'title': '测试文章',
'content': '测试内容'
}
response = client.post('/api/v1/articles',
data=json.dumps(data),
content_type='application/json')
assert response.status_code == 401
def test_api_articles_post_authorized(self, client, auth_user):
"""测试授权创建文章API"""
# 首先获取JWT令牌
login_response = client.post('/api/v1/auth/login',
data=json.dumps({
'email': 'test@example.com',
'password': 'password123'
}),
content_type='application/json')
token_data = json.loads(login_response.data)
token = token_data['access_token']
# 创建分类
category = Category(name='测试分类', slug='test')
db.session.add(category)
db.session.commit()
# 使用令牌创建文章
data = {
'title': '测试文章',
'content': '测试内容',
'category_id': category.id
}
response = client.post('/api/v1/articles',
data=json.dumps(data),
content_type='application/json',
headers={'Authorization': f'Bearer {token}'})
assert response.status_code == 201
response_data = json.loads(response.data)
assert response_data['title'] == '测试文章'
12.13.3 性能测试
# tests/test_performance.py
import pytest
import time
from concurrent.futures import ThreadPoolExecutor
from app import create_app, db
from app.models.user import User
from app.models.article import Article
from app.models.category import Category
@pytest.fixture
def app():
"""创建测试应用"""
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.drop_all()
@pytest.fixture
def client(app):
"""创建测试客户端"""
return app.test_client()
class TestPerformance:
"""性能测试"""
def test_homepage_load_time(self, client):
"""测试首页加载时间"""
start_time = time.time()
response = client.get('/')
end_time = time.time()
assert response.status_code == 200
load_time = end_time - start_time
assert load_time < 1.0 # 首页应在1秒内加载完成
def test_concurrent_requests(self, client):
"""测试并发请求"""
def make_request():
return client.get('/')
# 模拟10个并发请求
with ThreadPoolExecutor(max_workers=10) as executor:
start_time = time.time()
futures = [executor.submit(make_request) for _ in range(10)]
responses = [future.result() for future in futures]
end_time = time.time()
# 检查所有请求都成功
for response in responses:
assert response.status_code == 200
# 检查总时间
total_time = end_time - start_time
assert total_time < 5.0 # 10个并发请求应在5秒内完成
def test_database_query_performance(self, app):
"""测试数据库查询性能"""
with app.app_context():
# 创建测试数据
category = Category(name='测试分类', slug='test')
db.session.add(category)
db.session.commit()
user = User(username='testuser', email='test@example.com')
user.set_password('password')
db.session.add(user)
db.session.commit()
# 创建100篇文章
articles = []
for i in range(100):
article = Article(
title=f'测试文章 {i}',
content=f'测试内容 {i}',
author=user,
category=category,
status='published'
)
articles.append(article)
db.session.add_all(articles)
db.session.commit()
# 测试查询性能
start_time = time.time()
result = Article.query.filter_by(status='published').limit(10).all()
end_time = time.time()
assert len(result) == 10
query_time = end_time - start_time
assert query_time < 0.1 # 查询应在100ms内完成
def test_memory_usage(self, app):
"""测试内存使用"""
import psutil
import os
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
with app.app_context():
# 执行一些操作
for i in range(1000):
user = User(username=f'user{i}', email=f'user{i}@example.com')
# 不保存到数据库,只是创建对象
final_memory = process.memory_info().rss
memory_increase = final_memory - initial_memory
# 内存增长应该在合理范围内(比如50MB)
assert memory_increase < 50 * 1024 * 1024
12.14 项目总结与展望
12.14.1 项目成果
通过本项目的完整实现,我们成功构建了一个功能完善的Flask博客系统,主要成果包括:
技术成果: 1. 完整的Web应用架构:采用MVC模式,代码结构清晰,易于维护 2. 现代化的技术栈:Flask + SQLAlchemy + PostgreSQL + Redis + Elasticsearch 3. 全面的功能模块:用户管理、内容管理、评论系统、搜索功能 4. 性能优化实践:多层缓存、数据库优化、前端优化 5. 安全防护措施:CSRF保护、XSS防护、SQL注入防护 6. 完善的测试体系:单元测试、集成测试、性能测试 7. 自动化部署:Docker容器化、CI/CD流程
学习成果: 1. 深入理解Flask框架:从基础概念到高级特性的全面掌握 2. 数据库设计能力:关系设计、索引优化、查询调优 3. 前端开发技能:响应式设计、用户体验优化 4. 系统架构思维:分层设计、服务化、可扩展性考虑 5. DevOps实践:容器化部署、监控告警、日志管理
12.14.2 技术亮点
架构设计亮点: - 应用工厂模式:支持多环境配置,便于测试和部署 - 蓝图组织:模块化路由管理,代码结构清晰 - 服务层设计:业务逻辑与视图层分离,提高代码复用性 - 缓存策略:多层缓存设计,显著提升系统性能
功能实现亮点: - 富文本编辑:集成CKEditor,支持图片上传和格式化 - 全文搜索:基于Elasticsearch的高性能搜索 - 异步任务:使用Celery处理邮件发送等耗时操作 - 实时功能:WebSocket支持实时评论和通知
性能优化亮点: - 数据库优化:索引设计、查询优化、连接池配置 - 缓存优化:Redis缓存、页面缓存、对象缓存 - 前端优化:资源压缩、懒加载、CDN集成 - 监控体系:性能监控、错误追踪、日志分析
12.14.3 项目价值
教育价值: 1. 完整的学习路径:从基础到高级的系统性学习 2. 实战经验积累:真实项目开发经验 3. 最佳实践示范:行业标准的代码规范和架构设计 4. 问题解决能力:常见问题的解决方案和思路
商业价值: 1. 可直接使用:稍作定制即可用于实际项目 2. 技术选型参考:为类似项目提供技术选型依据 3. 架构模板:可作为其他Web项目的架构模板 4. 团队培训:可用于团队技术培训和知识分享
12.14.4 未来展望
短期规划(1-3个月): 1. 移动端适配:开发响应式设计,优化移动端体验 2. API完善:扩展RESTful API,支持第三方集成 3. 性能优化:进一步优化数据库查询和缓存策略 4. 安全加固:增强安全防护,通过安全审计
中期规划(3-6个月): 1. 微服务改造:拆分为用户服务、内容服务、搜索服务 2. 云原生部署:支持Kubernetes部署,实现自动扩缩容 3. 数据分析:集成数据分析功能,提供用户行为分析 4. AI功能:集成AI推荐系统和内容审核
长期规划(6-12个月): 1. 多租户支持:支持多站点管理,SaaS化部署 2. 国际化:支持多语言,面向全球用户 3. 生态建设:开发插件系统,支持第三方扩展 4. 商业化:开发付费功能,实现商业价值
12.14.5 技术发展趋势
Web开发趋势: 1. 前后端分离:API优先的开发模式 2. 微服务架构:服务拆分和治理 3. 云原生技术:容器化、服务网格、无服务器 4. AI集成:智能推荐、自动化运维
Flask生态发展: 1. 异步支持:Flask 2.0+的异步特性 2. 类型提示:更好的IDE支持和代码质量 3. 性能优化:ASGI支持,更好的并发性能 4. 生态完善:更多高质量的扩展包
12.14.6 学习建议
继续学习方向: 1. 深入Flask:学习Flask源码,理解框架原理 2. 扩展技术栈:学习FastAPI、Django等其他框架 3. 前端技术:深入学习React、Vue等现代前端框架 4. DevOps技能:掌握Kubernetes、监控、CI/CD等技术 5. 架构设计:学习分布式系统、微服务架构
实践建议: 1. 参与开源:为Flask生态贡献代码 2. 技术分享:写技术博客,分享学习心得 3. 项目实战:开发更多实际项目,积累经验 4. 社区参与:参加技术会议,与同行交流
通过本项目的学习和实践,相信大家已经掌握了Flask Web开发的核心技能,具备了开发现代化Web应用的能力。希望大家能够继续深入学习,在Web开发的道路上不断进步,创造出更多优秀的作品!