6.1 异步编程基础
6.1.1 JavaScript 异步模型
JavaScript 是单线程的,但通过事件循环机制支持异步操作:
// 同步 vs 异步
console.log('1. 同步操作开始');
// 异步操作
setTimeout(() => {
console.log('3. 异步操作执行');
}, 0);
console.log('2. 同步操作结束');
// 输出顺序:1 -> 2 -> 3
// 事件循环示例
function demonstrateEventLoop() {
console.log('Start');
// 宏任务
setTimeout(() => console.log('Timeout 1'), 0);
setTimeout(() => console.log('Timeout 2'), 0);
// 微任务
Promise.resolve().then(() => console.log('Promise 1'));
Promise.resolve().then(() => console.log('Promise 2'));
console.log('End');
// 输出:Start -> End -> Promise 1 -> Promise 2 -> Timeout 1 -> Timeout 2
}
6.1.2 回调函数的问题
// 回调地狱示例
function fetchUserData(userId: string, callback: (error: Error | null, user?: User) => void) {
setTimeout(() => {
if (userId === 'invalid') {
callback(new Error('Invalid user ID'));
} else {
callback(null, { id: userId, name: 'Alice', email: 'alice@example.com' });
}
}, 1000);
}
function fetchUserPosts(userId: string, callback: (error: Error | null, posts?: Post[]) => void) {
setTimeout(() => {
callback(null, [{ id: '1', title: 'Hello World', content: 'First post' }]);
}, 500);
}
function fetchPostComments(postId: string, callback: (error: Error | null, comments?: Comment[]) => void) {
setTimeout(() => {
callback(null, [{ id: '1', text: 'Great post!', author: 'Bob' }]);
}, 300);
}
// 回调地狱
fetchUserData('123', (userError, user) => {
if (userError) {
console.error('User error:', userError);
return;
}
fetchUserPosts(user!.id, (postsError, posts) => {
if (postsError) {
console.error('Posts error:', postsError);
return;
}
fetchPostComments(posts![0].id, (commentsError, comments) => {
if (commentsError) {
console.error('Comments error:', commentsError);
return;
}
console.log('User:', user);
console.log('Posts:', posts);
console.log('Comments:', comments);
});
});
});
6.2 Promise 详解
6.2.1 Promise 基础
// Promise 的三种状态
enum PromiseState {
Pending = 'pending',
Fulfilled = 'fulfilled',
Rejected = 'rejected'
}
// 创建 Promise
function createPromise<T>(value: T, delay: number = 1000): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(() => {
if (Math.random() > 0.1) {
resolve(value);
} else {
reject(new Error('Random failure'));
}
}, delay);
});
}
// 使用 Promise
const promise = createPromise('Hello, Promise!');
promise
.then(value => {
console.log('Success:', value);
return value.toUpperCase();
})
.then(upperValue => {
console.log('Transformed:', upperValue);
})
.catch(error => {
console.error('Error:', error.message);
})
.finally(() => {
console.log('Promise completed');
});
6.2.2 Promise 类型定义
// 自定义 Promise 类型
interface CustomPromise<T> {
then<U>(onFulfilled?: (value: T) => U | Promise<U>): CustomPromise<U>;
catch<U>(onRejected?: (reason: any) => U | Promise<U>): CustomPromise<T | U>;
finally(onFinally?: () => void): CustomPromise<T>;
}
// 泛型 Promise 工厂
class PromiseFactory {
static delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
static timeout<T>(promise: Promise<T>, ms: number): Promise<T> {
return Promise.race([
promise,
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), ms)
)
]);
}
static retry<T>(
fn: () => Promise<T>,
maxAttempts: number = 3,
delay: number = 1000
): Promise<T> {
return new Promise(async (resolve, reject) => {
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
const result = await fn();
resolve(result);
return;
} catch (error) {
if (attempt === maxAttempts) {
reject(error);
return;
}
await this.delay(delay * attempt);
}
}
});
}
}
// 使用示例
PromiseFactory.timeout(
fetch('https://api.example.com/data'),
5000
).then(response => response.json())
.catch(error => console.error('Request failed or timed out:', error));
PromiseFactory.retry(
() => fetch('https://unreliable-api.com/data').then(r => r.json()),
3,
1000
).then(data => console.log('Data:', data))
.catch(error => console.error('All attempts failed:', error));
6.2.3 Promise 组合方法
// Promise.all - 所有 Promise 都成功
interface User {
id: string;
name: string;
email: string;
}
interface Post {
id: string;
title: string;
content: string;
userId: string;
}
interface Comment {
id: string;
text: string;
postId: string;
author: string;
}
async function fetchUserData(userId: string): Promise<User> {
await PromiseFactory.delay(1000);
return { id: userId, name: 'Alice', email: 'alice@example.com' };
}
async function fetchUserPosts(userId: string): Promise<Post[]> {
await PromiseFactory.delay(800);
return [
{ id: '1', title: 'First Post', content: 'Hello World', userId },
{ id: '2', title: 'Second Post', content: 'TypeScript is great', userId }
];
}
async function fetchPostComments(postId: string): Promise<Comment[]> {
await PromiseFactory.delay(600);
return [
{ id: '1', text: 'Great post!', postId, author: 'Bob' },
{ id: '2', text: 'Thanks for sharing', postId, author: 'Charlie' }
];
}
// Promise.all 示例
async function loadUserDashboard(userId: string) {
try {
const [user, posts] = await Promise.all([
fetchUserData(userId),
fetchUserPosts(userId)
]);
console.log('User:', user);
console.log('Posts:', posts);
// 并行获取所有帖子的评论
const commentsPromises = posts.map(post => fetchPostComments(post.id));
const allComments = await Promise.all(commentsPromises);
console.log('All comments:', allComments.flat());
} catch (error) {
console.error('Failed to load dashboard:', error);
}
}
// Promise.allSettled - 等待所有 Promise 完成(无论成功或失败)
async function loadUserDashboardSafe(userId: string) {
const results = await Promise.allSettled([
fetchUserData(userId),
fetchUserPosts(userId),
fetch('https://api.example.com/notifications').then(r => r.json())
]);
results.forEach((result, index) => {
if (result.status === 'fulfilled') {
console.log(`Result ${index}:`, result.value);
} else {
console.error(`Error ${index}:`, result.reason);
}
});
}
// Promise.race - 第一个完成的 Promise
async function fetchWithFallback(primaryUrl: string, fallbackUrl: string) {
try {
const result = await Promise.race([
fetch(primaryUrl),
fetch(fallbackUrl)
]);
return await result.json();
} catch (error) {
console.error('Both requests failed:', error);
throw error;
}
}
// Promise.any - 第一个成功的 Promise
async function fetchFromMultipleSources(urls: string[]) {
try {
const response = await Promise.any(
urls.map(url => fetch(url))
);
return await response.json();
} catch (error) {
console.error('All requests failed:', error);
throw error;
}
}
6.3 async/await 详解
6.3.1 async/await 基础
// async 函数总是返回 Promise
async function simpleAsync(): Promise<string> {
return 'Hello, async!';
}
// 等价于
function simplePromise(): Promise<string> {
return Promise.resolve('Hello, async!');
}
// await 只能在 async 函数中使用
async function demonstrateAwait() {
try {
const result1 = await simpleAsync();
console.log(result1);
const result2 = await PromiseFactory.delay(1000).then(() => 'Delayed result');
console.log(result2);
// 并行执行
const [user, posts] = await Promise.all([
fetchUserData('123'),
fetchUserPosts('123')
]);
console.log('User and posts loaded:', { user, posts });
} catch (error) {
console.error('Error in async function:', error);
}
}
// 顶级 await(ES2022+)
// 在模块顶层可以直接使用 await
const config = await fetch('/api/config').then(r => r.json());
console.log('Config loaded:', config);
6.3.2 错误处理
// 多种错误处理方式
class ApiError extends Error {
constructor(
message: string,
public statusCode: number,
public response?: any
) {
super(message);
this.name = 'ApiError';
}
}
class NetworkError extends Error {
constructor(message: string, public originalError: Error) {
super(message);
this.name = 'NetworkError';
}
}
async function fetchWithErrorHandling(url: string): Promise<any> {
try {
const response = await fetch(url);
if (!response.ok) {
throw new ApiError(
`HTTP ${response.status}: ${response.statusText}`,
response.status,
await response.text()
);
}
return await response.json();
} catch (error) {
if (error instanceof TypeError) {
// 网络错误
throw new NetworkError('Network request failed', error);
}
// 重新抛出其他错误
throw error;
}
}
// 使用 try-catch 处理错误
async function handleApiCall() {
try {
const data = await fetchWithErrorHandling('https://api.example.com/data');
console.log('Data received:', data);
} catch (error) {
if (error instanceof ApiError) {
console.error(`API Error ${error.statusCode}: ${error.message}`);
if (error.statusCode === 401) {
// 处理认证错误
redirectToLogin();
}
} else if (error instanceof NetworkError) {
console.error('Network Error:', error.message);
showOfflineMessage();
} else {
console.error('Unexpected error:', error);
}
}
}
// 错误边界模式
class AsyncErrorBoundary {
static async safeExecute<T>(
fn: () => Promise<T>,
fallback?: T,
onError?: (error: Error) => void
): Promise<T | undefined> {
try {
return await fn();
} catch (error) {
if (onError) {
onError(error as Error);
} else {
console.error('Async operation failed:', error);
}
return fallback;
}
}
}
// 使用错误边界
const userData = await AsyncErrorBoundary.safeExecute(
() => fetchUserData('123'),
{ id: '123', name: 'Unknown', email: 'unknown@example.com' },
(error) => console.error('Failed to fetch user:', error)
);
function redirectToLogin() {
console.log('Redirecting to login...');
}
function showOfflineMessage() {
console.log('You appear to be offline');
}
6.3.3 并发控制
// 并发限制器
class ConcurrencyLimiter {
private running = 0;
private queue: Array<() => void> = [];
constructor(private maxConcurrency: number) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
const task = async () => {
try {
this.running++;
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
} finally {
this.running--;
this.processQueue();
}
};
if (this.running < this.maxConcurrency) {
task();
} else {
this.queue.push(task);
}
});
}
private processQueue() {
if (this.queue.length > 0 && this.running < this.maxConcurrency) {
const task = this.queue.shift()!;
task();
}
}
}
// 批处理器
class BatchProcessor<T, R> {
private batch: T[] = [];
private timer: NodeJS.Timeout | null = null;
constructor(
private batchSize: number,
private batchTimeout: number,
private processor: (items: T[]) => Promise<R[]>
) {}
async add(item: T): Promise<R> {
return new Promise((resolve, reject) => {
this.batch.push(item);
const itemIndex = this.batch.length - 1;
// 设置超时处理
if (this.timer) {
clearTimeout(this.timer);
}
this.timer = setTimeout(() => {
this.processBatch().then(results => {
resolve(results[itemIndex]);
}).catch(reject);
}, this.batchTimeout);
// 如果达到批次大小,立即处理
if (this.batch.length >= this.batchSize) {
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
this.processBatch().then(results => {
resolve(results[itemIndex]);
}).catch(reject);
}
});
}
private async processBatch(): Promise<R[]> {
const currentBatch = [...this.batch];
this.batch = [];
return await this.processor(currentBatch);
}
}
// 使用示例
const limiter = new ConcurrencyLimiter(3);
// 限制并发请求
const urls = [
'https://api.example.com/data1',
'https://api.example.com/data2',
'https://api.example.com/data3',
'https://api.example.com/data4',
'https://api.example.com/data5'
];
const results = await Promise.all(
urls.map(url =>
limiter.execute(() => fetch(url).then(r => r.json()))
)
);
console.log('All requests completed:', results);
// 批处理示例
const batchProcessor = new BatchProcessor(
5, // 批次大小
1000, // 超时时间
async (userIds: string[]) => {
// 批量获取用户数据
const response = await fetch('/api/users/batch', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ userIds })
});
return await response.json();
}
);
// 添加单个用户请求,会自动批处理
const user1 = await batchProcessor.add('user1');
const user2 = await batchProcessor.add('user2');
const user3 = await batchProcessor.add('user3');
6.4 生成器和异步迭代器
6.4.1 生成器函数
// 基本生成器
function* numberGenerator(): Generator<number, void, unknown> {
let i = 0;
while (true) {
yield i++;
}
}
const numbers = numberGenerator();
console.log(numbers.next().value); // 0
console.log(numbers.next().value); // 1
console.log(numbers.next().value); // 2
// 有限生成器
function* rangeGenerator(start: number, end: number): Generator<number, number, unknown> {
for (let i = start; i <= end; i++) {
yield i;
}
return end - start + 1; // 返回值
}
const range = rangeGenerator(1, 5);
for (const num of range) {
console.log(num); // 1, 2, 3, 4, 5
}
// 生成器可以接收值
function* twoWayGenerator(): Generator<number, void, string> {
let value = 0;
while (true) {
const input = yield value;
if (input) {
value = parseInt(input, 10) || 0;
} else {
value++;
}
}
}
const twoWay = twoWayGenerator();
console.log(twoWay.next().value); // 0
console.log(twoWay.next('10').value); // 10
console.log(twoWay.next().value); // 11
6.4.2 异步生成器
// 异步生成器
async function* asyncNumberGenerator(): AsyncGenerator<number, void, unknown> {
let i = 0;
while (i < 5) {
await PromiseFactory.delay(1000);
yield i++;
}
}
// 使用异步生成器
async function consumeAsyncGenerator() {
for await (const num of asyncNumberGenerator()) {
console.log('Async number:', num);
}
}
// 数据流处理
async function* fetchDataStream(urls: string[]): AsyncGenerator<any, void, unknown> {
for (const url of urls) {
try {
const response = await fetch(url);
const data = await response.json();
yield data;
} catch (error) {
console.error(`Failed to fetch ${url}:`, error);
}
}
}
// 分页数据获取
async function* paginatedFetch<T>(
baseUrl: string,
pageSize: number = 10
): AsyncGenerator<T[], void, unknown> {
let page = 1;
let hasMore = true;
while (hasMore) {
try {
const response = await fetch(`${baseUrl}?page=${page}&size=${pageSize}`);
const data = await response.json();
if (data.items && data.items.length > 0) {
yield data.items;
hasMore = data.hasMore;
page++;
} else {
hasMore = false;
}
} catch (error) {
console.error('Pagination fetch error:', error);
hasMore = false;
}
}
}
// 使用分页获取
async function loadAllData() {
const allItems: any[] = [];
for await (const page of paginatedFetch('/api/items', 20)) {
allItems.push(...page);
console.log(`Loaded ${page.length} items, total: ${allItems.length}`);
}
return allItems;
}
6.4.3 异步迭代器模式
// 自定义异步迭代器
class AsyncDataLoader<T> implements AsyncIterable<T> {
constructor(
private dataSource: () => Promise<T[]>,
private batchSize: number = 10
) {}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
const data = await this.dataSource();
for (let i = 0; i < data.length; i += this.batchSize) {
const batch = data.slice(i, i + this.batchSize);
for (const item of batch) {
yield item;
}
// 在批次之间添加延迟
if (i + this.batchSize < data.length) {
await PromiseFactory.delay(100);
}
}
}
}
// 实时数据流
class RealTimeDataStream<T> implements AsyncIterable<T> {
private listeners: Array<(data: T) => void> = [];
private buffer: T[] = [];
private isActive = false;
constructor(private source: EventSource | WebSocket) {
this.setupEventHandlers();
}
private setupEventHandlers() {
if (this.source instanceof EventSource) {
this.source.onmessage = (event) => {
const data = JSON.parse(event.data);
this.emit(data);
};
} else if (this.source instanceof WebSocket) {
this.source.onmessage = (event) => {
const data = JSON.parse(event.data);
this.emit(data);
};
}
}
private emit(data: T) {
this.buffer.push(data);
this.listeners.forEach(listener => listener(data));
}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
this.isActive = true;
let index = 0;
while (this.isActive) {
if (index < this.buffer.length) {
yield this.buffer[index++];
} else {
// 等待新数据
await new Promise<void>(resolve => {
const listener = () => {
this.listeners = this.listeners.filter(l => l !== listener);
resolve();
};
this.listeners.push(listener);
});
}
}
}
stop() {
this.isActive = false;
}
}
// 使用示例
async function processDataStream() {
const loader = new AsyncDataLoader(
async () => {
// 模拟数据获取
return Array.from({ length: 100 }, (_, i) => ({ id: i, value: Math.random() }));
},
5
);
for await (const item of loader) {
console.log('Processing item:', item);
// 处理每个数据项
}
}
6.5 高级异步模式
6.5.1 发布订阅模式
// 类型安全的事件发射器
type EventMap = {
[K: string]: any[];
};
class TypedEventEmitter<T extends EventMap> {
private listeners: {
[K in keyof T]?: Array<(...args: T[K]) => void | Promise<void>>;
} = {};
on<K extends keyof T>(event: K, listener: (...args: T[K]) => void | Promise<void>): void {
if (!this.listeners[event]) {
this.listeners[event] = [];
}
this.listeners[event]!.push(listener);
}
off<K extends keyof T>(event: K, listener: (...args: T[K]) => void | Promise<void>): void {
if (this.listeners[event]) {
this.listeners[event] = this.listeners[event]!.filter(l => l !== listener);
}
}
async emit<K extends keyof T>(event: K, ...args: T[K]): Promise<void> {
if (this.listeners[event]) {
const promises = this.listeners[event]!.map(listener =>
Promise.resolve(listener(...args))
);
await Promise.all(promises);
}
}
once<K extends keyof T>(event: K): Promise<T[K]> {
return new Promise(resolve => {
const listener = (...args: T[K]) => {
this.off(event, listener);
resolve(args);
};
this.on(event, listener);
});
}
}
// 定义事件类型
interface AppEvents {
'user:login': [{ userId: string; timestamp: Date }];
'user:logout': [{ userId: string }];
'data:updated': [{ type: string; data: any }];
'error': [Error];
}
// 使用类型安全的事件发射器
const eventEmitter = new TypedEventEmitter<AppEvents>();
// 监听事件
eventEmitter.on('user:login', async ({ userId, timestamp }) => {
console.log(`User ${userId} logged in at ${timestamp}`);
// 异步处理登录逻辑
await updateUserStatus(userId, 'online');
});
eventEmitter.on('data:updated', async ({ type, data }) => {
console.log(`Data updated: ${type}`, data);
// 异步处理数据更新
await syncDataToServer(type, data);
});
// 发射事件
await eventEmitter.emit('user:login', { userId: '123', timestamp: new Date() });
await eventEmitter.emit('data:updated', { type: 'user', data: { name: 'Alice' } });
// 等待单次事件
const [loginData] = await eventEmitter.once('user:login');
console.log('User logged in:', loginData);
async function updateUserStatus(userId: string, status: string): Promise<void> {
console.log(`Updating user ${userId} status to ${status}`);
}
async function syncDataToServer(type: string, data: any): Promise<void> {
console.log(`Syncing ${type} data to server:`, data);
}
6.5.2 响应式编程模式
// 简单的 Observable 实现
class Observable<T> {
constructor(
private subscriber: (observer: Observer<T>) => (() => void) | void
) {}
subscribe(observer: Partial<Observer<T>>): Subscription {
const fullObserver: Observer<T> = {
next: observer.next || (() => {}),
error: observer.error || ((err) => { throw err; }),
complete: observer.complete || (() => {})
};
const unsubscribe = this.subscriber(fullObserver) || (() => {});
return {
unsubscribe
};
}
map<U>(fn: (value: T) => U): Observable<U> {
return new Observable<U>(observer => {
return this.subscribe({
next: value => observer.next(fn(value)),
error: err => observer.error(err),
complete: () => observer.complete()
}).unsubscribe;
});
}
filter(predicate: (value: T) => boolean): Observable<T> {
return new Observable<T>(observer => {
return this.subscribe({
next: value => {
if (predicate(value)) {
observer.next(value);
}
},
error: err => observer.error(err),
complete: () => observer.complete()
}).unsubscribe;
});
}
static fromPromise<T>(promise: Promise<T>): Observable<T> {
return new Observable<T>(observer => {
promise
.then(value => {
observer.next(value);
observer.complete();
})
.catch(err => observer.error(err));
});
}
static interval(ms: number): Observable<number> {
return new Observable<number>(observer => {
let count = 0;
const timer = setInterval(() => {
observer.next(count++);
}, ms);
return () => clearInterval(timer);
});
}
}
interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
interface Subscription {
unsubscribe: () => void;
}
// 使用 Observable
const numbers$ = Observable.interval(1000)
.map(n => n * 2)
.filter(n => n % 4 === 0);
const subscription = numbers$.subscribe({
next: value => console.log('Received:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Completed')
});
// 5秒后取消订阅
setTimeout(() => {
subscription.unsubscribe();
console.log('Unsubscribed');
}, 5000);
// 从 Promise 创建 Observable
const userPromise = fetchUserData('123');
const user$ = Observable.fromPromise(userPromise);
user$.subscribe({
next: user => console.log('User loaded:', user),
error: err => console.error('Failed to load user:', err)
});
6.5.3 状态机模式
// 异步状态机
type State = 'idle' | 'loading' | 'success' | 'error';
type Event = 'FETCH' | 'SUCCESS' | 'ERROR' | 'RESET';
interface StateMachine<TState, TEvent, TContext> {
state: TState;
context: TContext;
transition(event: TEvent, payload?: any): Promise<void>;
onStateChange(callback: (state: TState, context: TContext) => void): void;
}
class AsyncStateMachine implements StateMachine<State, Event, { data?: any; error?: Error }> {
state: State = 'idle';
context: { data?: any; error?: Error } = {};
private listeners: Array<(state: State, context: { data?: any; error?: Error }) => void> = [];
async transition(event: Event, payload?: any): Promise<void> {
const previousState = this.state;
switch (this.state) {
case 'idle':
if (event === 'FETCH') {
this.state = 'loading';
this.context = {};
this.notifyListeners();
try {
const data = await this.fetchData(payload);
await this.transition('SUCCESS', data);
} catch (error) {
await this.transition('ERROR', error);
}
}
break;
case 'loading':
if (event === 'SUCCESS') {
this.state = 'success';
this.context = { data: payload };
} else if (event === 'ERROR') {
this.state = 'error';
this.context = { error: payload };
}
break;
case 'success':
case 'error':
if (event === 'RESET') {
this.state = 'idle';
this.context = {};
} else if (event === 'FETCH') {
this.state = 'loading';
this.context = {};
this.notifyListeners();
try {
const data = await this.fetchData(payload);
await this.transition('SUCCESS', data);
} catch (error) {
await this.transition('ERROR', error);
}
}
break;
}
if (this.state !== previousState) {
this.notifyListeners();
}
}
onStateChange(callback: (state: State, context: { data?: any; error?: Error }) => void): void {
this.listeners.push(callback);
}
private notifyListeners(): void {
this.listeners.forEach(listener => listener(this.state, this.context));
}
private async fetchData(url: string): Promise<any> {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return await response.json();
}
}
// 使用状态机
const stateMachine = new AsyncStateMachine();
stateMachine.onStateChange((state, context) => {
console.log('State changed:', state, context);
switch (state) {
case 'loading':
showLoadingSpinner();
break;
case 'success':
hideLoadingSpinner();
displayData(context.data);
break;
case 'error':
hideLoadingSpinner();
showError(context.error);
break;
case 'idle':
hideLoadingSpinner();
clearDisplay();
break;
}
});
// 触发状态转换
await stateMachine.transition('FETCH', 'https://api.example.com/data');
function showLoadingSpinner() {
console.log('Showing loading spinner...');
}
function hideLoadingSpinner() {
console.log('Hiding loading spinner...');
}
function displayData(data: any) {
console.log('Displaying data:', data);
}
function showError(error: Error | undefined) {
console.error('Showing error:', error?.message);
}
function clearDisplay() {
console.log('Clearing display...');
}
6.6 性能优化
6.6.1 异步操作优化
// 缓存机制
class AsyncCache<K, V> {
private cache = new Map<K, Promise<V>>();
private ttl: number;
constructor(ttl: number = 5 * 60 * 1000) { // 默认5分钟
this.ttl = ttl;
}
async get(key: K, factory: () => Promise<V>): Promise<V> {
if (this.cache.has(key)) {
return this.cache.get(key)!;
}
const promise = factory();
this.cache.set(key, promise);
// 设置过期时间
setTimeout(() => {
this.cache.delete(key);
}, this.ttl);
try {
return await promise;
} catch (error) {
// 如果失败,立即从缓存中移除
this.cache.delete(key);
throw error;
}
}
clear(): void {
this.cache.clear();
}
delete(key: K): boolean {
return this.cache.delete(key);
}
}
// 请求去重
class RequestDeduplicator {
private pendingRequests = new Map<string, Promise<any>>();
async dedupe<T>(key: string, factory: () => Promise<T>): Promise<T> {
if (this.pendingRequests.has(key)) {
return this.pendingRequests.get(key)!;
}
const promise = factory();
this.pendingRequests.set(key, promise);
try {
const result = await promise;
return result;
} finally {
this.pendingRequests.delete(key);
}
}
}
// 使用示例
const cache = new AsyncCache<string, User>();
const deduplicator = new RequestDeduplicator();
async function getUser(userId: string): Promise<User> {
return cache.get(userId, () =>
deduplicator.dedupe(`user:${userId}`, () =>
fetchUserData(userId)
)
);
}
// 多次调用只会发起一次请求
const [user1, user2, user3] = await Promise.all([
getUser('123'),
getUser('123'),
getUser('123')
]);
console.log('All users are the same:', user1 === user2 && user2 === user3);
6.6.2 内存管理
// 弱引用缓存
class WeakAsyncCache<K extends object, V> {
private cache = new WeakMap<K, Promise<V>>();
async get(key: K, factory: () => Promise<V>): Promise<V> {
if (this.cache.has(key)) {
return this.cache.get(key)!;
}
const promise = factory();
this.cache.set(key, promise);
return promise;
}
}
// 资源清理
class ResourceManager {
private resources: Array<() => void | Promise<void>> = [];
register(cleanup: () => void | Promise<void>): void {
this.resources.push(cleanup);
}
async cleanup(): Promise<void> {
const cleanupPromises = this.resources.map(cleanup =>
Promise.resolve(cleanup())
);
await Promise.all(cleanupPromises);
this.resources = [];
}
}
// 使用 AbortController 取消请求
class CancellableRequest {
private controller: AbortController;
constructor() {
this.controller = new AbortController();
}
async fetch(url: string, options: RequestInit = {}): Promise<Response> {
return fetch(url, {
...options,
signal: this.controller.signal
});
}
cancel(): void {
this.controller.abort();
}
}
// 使用示例
const resourceManager = new ResourceManager();
async function setupAsyncOperation() {
const request = new CancellableRequest();
// 注册清理函数
resourceManager.register(() => {
request.cancel();
console.log('Request cancelled');
});
try {
const response = await request.fetch('https://api.example.com/data');
const data = await response.json();
return data;
} catch (error) {
if (error.name === 'AbortError') {
console.log('Request was cancelled');
} else {
console.error('Request failed:', error);
}
throw error;
}
}
// 在组件卸载或页面离开时清理资源
window.addEventListener('beforeunload', () => {
resourceManager.cleanup();
});
6.7 本章练习
练习 1:实现异步队列
// 实现一个异步队列,支持以下功能:
// 1. 添加任务到队列
// 2. 控制并发数量
// 3. 支持优先级
// 4. 支持任务取消
// 5. 提供进度回调
interface Task<T> {
id: string;
priority: number;
execute: () => Promise<T>;
onProgress?: (progress: number) => void;
}
class AsyncQueue<T> {
// 你的实现
}
// 使用示例
const queue = new AsyncQueue<any>(3); // 最大并发数为3
queue.add({
id: 'task1',
priority: 1,
execute: async () => {
// 模拟异步任务
await new Promise(resolve => setTimeout(resolve, 1000));
return 'Task 1 completed';
},
onProgress: (progress) => console.log(`Task 1 progress: ${progress}%`)
});
queue.add({
id: 'task2',
priority: 2,
execute: async () => {
await new Promise(resolve => setTimeout(resolve, 2000));
return 'Task 2 completed';
}
});
// 取消任务
queue.cancel('task1');
// 等待所有任务完成
const results = await queue.waitForAll();
console.log('All tasks completed:', results);
练习 2:实现数据流处理管道
// 实现一个数据流处理管道,支持:
// 1. 链式操作(map, filter, reduce等)
// 2. 异步处理
// 3. 错误处理
// 4. 背压控制
class AsyncPipeline<T> {
// 你的实现
}
// 使用示例
const pipeline = new AsyncPipeline<number>()
.map(async (x) => x * 2)
.filter(async (x) => x > 10)
.map(async (x) => ({ value: x, timestamp: Date.now() }))
.catch((error, item) => {
console.error('Error processing item:', item, error);
return null; // 跳过错误项
});
// 处理数据流
const input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
const results = await pipeline.process(input);
console.log('Pipeline results:', results);
练习 3:实现分布式锁
// 实现一个基于 Promise 的分布式锁机制:
// 1. 支持锁的获取和释放
// 2. 支持锁超时
// 3. 支持锁的重入
// 4. 提供锁状态查询
class DistributedLock {
// 你的实现
}
// 使用示例
const lock = new DistributedLock();
async function criticalSection(id: string) {
const lockKey = 'resource:123';
try {
await lock.acquire(lockKey, { timeout: 5000 });
console.log(`${id} acquired lock`);
// 执行关键代码
await new Promise(resolve => setTimeout(resolve, 2000));
console.log(`${id} finished critical section`);
} finally {
await lock.release(lockKey);
console.log(`${id} released lock`);
}
}
// 并发执行
Promise.all([
criticalSection('Process A'),
criticalSection('Process B'),
criticalSection('Process C')
]);
6.8 本章总结
本章深入探讨了 TypeScript 中的异步编程,包括:
- 异步编程基础:理解了 JavaScript 的异步模型和事件循环
- Promise 详解:掌握了 Promise 的使用、类型定义和组合方法
- async/await 详解:学习了现代异步编程语法和错误处理
- 生成器和异步迭代器:了解了数据流处理的高级模式
- 高级异步模式:掌握了发布订阅、响应式编程和状态机模式
- 性能优化:学习了缓存、去重和资源管理等优化技术
异步编程是现代 JavaScript/TypeScript 开发的核心技能,正确理解和使用这些概念对于构建高性能、可维护的应用程序至关重要。
下一章我们将学习错误处理和调试技巧,了解如何在 TypeScript 项目中有效地处理错误和进行调试。