开发环境准备
系统要求
硬件要求
- CPU:双核2.0GHz以上
- 内存:4GB以上(推荐8GB)
- 存储:至少10GB可用空间
- 网络:稳定的互联网连接
操作系统支持
- Windows:Windows 10⁄11 (64位)
- macOS:macOS 10.15 Catalina及以上
- Linux:Ubuntu 18.04+, CentOS 7+, Debian 10+
基础工具安装
1. Node.js环境
Windows安装:
# 使用Chocolatey安装
choco install nodejs
# 或者下载官方安装包
# 访问 https://nodejs.org/
macOS安装:
# 使用Homebrew安装
brew install node
# 或使用官方安装包
# 访问 https://nodejs.org/
Linux安装:
# Ubuntu/Debian
curl -fsSL https://deb.nodesource.com/setup_lts.x | sudo -E bash -
sudo apt-get install -y nodejs
# CentOS/RHEL
curl -fsSL https://rpm.nodesource.com/setup_lts.x | sudo bash -
sudo yum install -y nodejs
验证安装:
node --version # 应显示 v18.0.0 或更高版本
npm --version # 应显示 8.0.0 或更高版本
2. Python环境
Windows安装:
# 使用Chocolatey安装
choco install python
# 或从Microsoft Store安装
# 或下载官方安装包
macOS安装:
# 使用Homebrew安装
brew install python@3.11
# 或使用pyenv
brew install pyenv
pyenv install 3.11.0
pyenv global 3.11.0
Linux安装:
# Ubuntu/Debian
sudo apt update
sudo apt install python3 python3-pip python3-venv
# CentOS/RHEL
sudo yum install python3 python3-pip
验证安装:
python3 --version # 应显示 Python 3.8+
pip3 --version # 应显示对应的pip版本
3. Git版本控制
安装Git:
# Windows (Chocolatey)
choco install git
# macOS (Homebrew)
brew install git
# Ubuntu/Debian
sudo apt install git
# CentOS/RHEL
sudo yum install git
配置Git:
git config --global user.name "Your Name"
git config --global user.email "your.email@example.com"
MCP开发工具安装
1. MCP CLI工具
# 全局安装MCP CLI
npm install -g @modelcontextprotocol/cli
# 验证安装
mcp --version
2. MCP Python SDK
# 创建虚拟环境
python3 -m venv mcp-env
# 激活虚拟环境
# Windows
mcp-env\Scripts\activate
# macOS/Linux
source mcp-env/bin/activate
# 安装MCP Python SDK
pip install mcp
# 验证安装
python -c "import mcp; print(mcp.__version__)"
3. MCP TypeScript SDK
# 创建新项目
mkdir my-mcp-project
cd my-mcp-project
# 初始化npm项目
npm init -y
# 安装MCP TypeScript SDK
npm install @modelcontextprotocol/sdk
npm install -D typescript @types/node
# 创建TypeScript配置
npx tsc --init
代码编辑器配置
Visual Studio Code
安装VS Code: - 访问 https://code.visualstudio.com/ - 下载并安装对应平台的版本
推荐扩展:
# 安装推荐扩展
code --install-extension ms-python.python
code --install-extension ms-vscode.vscode-typescript-next
code --install-extension ms-vscode.vscode-json
code --install-extension redhat.vscode-yaml
code --install-extension ms-vscode.vscode-eslint
code --install-extension esbenp.prettier-vscode
VS Code配置文件 (.vscode/settings.json
):
{
"python.defaultInterpreterPath": "./mcp-env/bin/python",
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"typescript.preferences.importModuleSpecifier": "relative",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
},
"files.associations": {
"*.mcp": "json"
}
}
JetBrains IDEs配置
PyCharm配置: 1. 打开PyCharm 2. 配置Python解释器:File → Settings → Project → Python Interpreter 3. 选择虚拟环境中的Python解释器 4. 安装MCP插件(如果可用)
WebStorm配置: 1. 打开WebStorm 2. 配置Node.js:File → Settings → Languages & Frameworks → Node.js 3. 设置代码风格:File → Settings → Editor → Code Style
创建第一个MCP服务器
Python实现
1. 项目结构
my-first-mcp-server/
├── src/
│ ├── __init__.py
│ └── server.py
├── requirements.txt
├── README.md
└── pyproject.toml
2. 依赖配置
requirements.txt:
mcp>=1.0.0
fastapi>=0.104.0
uvicorn>=0.24.0
pydantic>=2.0.0
aiofiles>=23.0.0
requests>=2.31.0
pyproject.toml:
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "my-first-mcp-server"
version = "0.1.0"
description = "我的第一个MCP服务器"
authors = [{name = "Your Name", email = "your.email@example.com"}]
requires-python = ">=3.8"
dependencies = [
"mcp>=1.0.0",
"fastapi>=0.104.0",
"uvicorn>=0.24.0",
]
[project.scripts]
my-mcp-server = "src.server:main"
3. 服务器实现
src/server.py:
#!/usr/bin/env python3
"""
我的第一个MCP服务器
提供基本的工具和资源功能
"""
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from mcp import ClientSession, StdioServerSession
from mcp.server import Server
from mcp.server.models import (
InitializeResult,
Tool,
TextContent,
ImageContent,
EmbedContent,
Resource,
Prompt,
PromptMessage,
GetPromptResult,
)
from mcp.types import (
CallToolResult,
ListResourcesResult,
ListToolsResult,
ListPromptsResult,
ReadResourceResult,
TextResourceContents,
ImageResourceContents,
BlobResourceContents,
)
from pydantic import BaseModel, Field
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 创建MCP服务器实例
server = Server("my-first-mcp-server")
# 工具定义
class CalculatorInput(BaseModel):
"""计算器输入参数"""
expression: str = Field(description="要计算的数学表达式")
class TimeInput(BaseModel):
"""时间查询输入参数"""
timezone: Optional[str] = Field(default="UTC", description="时区")
format: Optional[str] = Field(default="%Y-%m-%d %H:%M:%S", description="时间格式")
class FileInput(BaseModel):
"""文件操作输入参数"""
path: str = Field(description="文件路径")
content: Optional[str] = Field(default=None, description="文件内容(写入时使用)")
# 初始化处理
@server.list_tools()
async def handle_list_tools() -> ListToolsResult:
"""返回可用工具列表"""
return ListToolsResult(
tools=[
Tool(
name="calculator",
description="执行数学计算",
inputSchema=CalculatorInput.model_json_schema(),
),
Tool(
name="get_time",
description="获取当前时间",
inputSchema=TimeInput.model_json_schema(),
),
Tool(
name="read_file",
description="读取文件内容",
inputSchema=FileInput.model_json_schema(),
),
Tool(
name="write_file",
description="写入文件内容",
inputSchema=FileInput.model_json_schema(),
),
]
)
# 工具调用处理
@server.call_tool()
async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> CallToolResult:
"""处理工具调用"""
try:
if name == "calculator":
return await handle_calculator(arguments)
elif name == "get_time":
return await handle_get_time(arguments)
elif name == "read_file":
return await handle_read_file(arguments)
elif name == "write_file":
return await handle_write_file(arguments)
else:
raise ValueError(f"未知工具: {name}")
except Exception as e:
logger.error(f"工具调用错误 {name}: {e}")
return CallToolResult(
content=[
TextContent(
type="text",
text=f"工具执行失败: {str(e)}"
)
],
isError=True
)
async def handle_calculator(arguments: Dict[str, Any]) -> CallToolResult:
"""处理计算器工具"""
input_data = CalculatorInput(**arguments)
try:
# 安全的数学表达式计算
allowed_names = {
k: v for k, v in __builtins__.items()
if k in ['abs', 'round', 'min', 'max', 'sum', 'pow']
}
allowed_names.update({
'pi': 3.141592653589793,
'e': 2.718281828459045,
})
result = eval(input_data.expression, {"__builtins__": {}}, allowed_names)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"计算结果: {input_data.expression} = {result}"
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"计算错误: {str(e)}"
)
],
isError=True
)
async def handle_get_time(arguments: Dict[str, Any]) -> CallToolResult:
"""处理时间查询工具"""
input_data = TimeInput(**arguments)
try:
from datetime import datetime
import pytz
if input_data.timezone == "UTC":
now = datetime.utcnow()
else:
tz = pytz.timezone(input_data.timezone)
now = datetime.now(tz)
formatted_time = now.strftime(input_data.format)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"当前时间 ({input_data.timezone}): {formatted_time}"
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"时间查询错误: {str(e)}"
)
],
isError=True
)
async def handle_read_file(arguments: Dict[str, Any]) -> CallToolResult:
"""处理文件读取工具"""
input_data = FileInput(**arguments)
try:
file_path = Path(input_data.path)
if not file_path.exists():
return CallToolResult(
content=[
TextContent(
type="text",
text=f"文件不存在: {input_data.path}"
)
],
isError=True
)
if not file_path.is_file():
return CallToolResult(
content=[
TextContent(
type="text",
text=f"路径不是文件: {input_data.path}"
)
],
isError=True
)
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return CallToolResult(
content=[
TextContent(
type="text",
text=f"文件内容 ({input_data.path}):\n\n{content}"
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"文件读取错误: {str(e)}"
)
],
isError=True
)
async def handle_write_file(arguments: Dict[str, Any]) -> CallToolResult:
"""处理文件写入工具"""
input_data = FileInput(**arguments)
if not input_data.content:
return CallToolResult(
content=[
TextContent(
type="text",
text="写入文件需要提供content参数"
)
],
isError=True
)
try:
file_path = Path(input_data.path)
# 确保目录存在
file_path.parent.mkdir(parents=True, exist_ok=True)
# 写入文件
with open(file_path, 'w', encoding='utf-8') as f:
f.write(input_data.content)
return CallToolResult(
content=[
TextContent(
type="text",
text=f"文件写入成功: {input_data.path}\n写入了 {len(input_data.content)} 个字符"
)
]
)
except Exception as e:
return CallToolResult(
content=[
TextContent(
type="text",
text=f"文件写入错误: {str(e)}"
)
],
isError=True
)
# 资源处理
@server.list_resources()
async def handle_list_resources() -> ListResourcesResult:
"""返回可用资源列表"""
return ListResourcesResult(
resources=[
Resource(
uri="file://./README.md",
name="项目说明",
description="项目的README文档",
mimeType="text/markdown"
),
Resource(
uri="config://server",
name="服务器配置",
description="当前服务器的配置信息",
mimeType="application/json"
),
]
)
@server.read_resource()
async def handle_read_resource(uri: str) -> ReadResourceResult:
"""读取资源内容"""
try:
if uri == "file://./README.md":
readme_content = """
# 我的第一个MCP服务器
这是一个简单的MCP服务器示例,提供以下功能:
## 工具
- **calculator**: 执行数学计算
- **get_time**: 获取当前时间
- **read_file**: 读取文件内容
- **write_file**: 写入文件内容
## 资源
- **README.md**: 项目说明文档
- **server配置**: 服务器配置信息
## 使用方法
1. 启动服务器
2. 连接MCP客户端
3. 调用相应的工具和资源
"""
return ReadResourceResult(
contents=[
TextResourceContents(
uri=uri,
mimeType="text/markdown",
text=readme_content
)
]
)
elif uri == "config://server":
config = {
"server_name": "my-first-mcp-server",
"version": "0.1.0",
"protocol_version": "2024-11-05",
"capabilities": {
"tools": True,
"resources": True,
"prompts": True
},
"startup_time": datetime.now().isoformat()
}
return ReadResourceResult(
contents=[
TextResourceContents(
uri=uri,
mimeType="application/json",
text=json.dumps(config, indent=2, ensure_ascii=False)
)
]
)
else:
raise ValueError(f"未知资源: {uri}")
except Exception as e:
logger.error(f"资源读取错误 {uri}: {e}")
raise
# 提示处理
@server.list_prompts()
async def handle_list_prompts() -> ListPromptsResult:
"""返回可用提示列表"""
return ListPromptsResult(
prompts=[
Prompt(
name="greeting",
description="生成问候语",
arguments=[
{
"name": "name",
"description": "要问候的人的姓名",
"required": True
},
{
"name": "time_of_day",
"description": "时间段(morning/afternoon/evening)",
"required": False
}
]
),
Prompt(
name="code_review",
description="代码审查提示",
arguments=[
{
"name": "code",
"description": "要审查的代码",
"required": True
},
{
"name": "language",
"description": "编程语言",
"required": False
}
]
)
]
)
@server.get_prompt()
async def handle_get_prompt(name: str, arguments: Dict[str, str]) -> GetPromptResult:
"""获取提示内容"""
try:
if name == "greeting":
user_name = arguments.get("name", "朋友")
time_of_day = arguments.get("time_of_day", "")
if time_of_day == "morning":
greeting = f"早上好,{user_name}!希望你今天有个美好的开始。"
elif time_of_day == "afternoon":
greeting = f"下午好,{user_name}!希望你下午工作顺利。"
elif time_of_day == "evening":
greeting = f"晚上好,{user_name}!希望你有个愉快的夜晚。"
else:
greeting = f"你好,{user_name}!很高兴见到你。"
return GetPromptResult(
description="个性化问候语",
messages=[
PromptMessage(
role="user",
content=TextContent(
type="text",
text=greeting
)
)
]
)
elif name == "code_review":
code = arguments.get("code", "")
language = arguments.get("language", "未指定")
prompt_text = f"""
请审查以下{language}代码:
```{language.lower() if language != "未指定" else ""}
{code}
请提供: 1. 代码质量评估 2. 潜在问题识别 3. 改进建议 4. 最佳实践推荐
请确保你的反馈具体、可操作,并解释你的建议背后的原因。 “””
return GetPromptResult(
description="代码审查提示",
messages=[
PromptMessage(
role="user",
content=TextContent(
type="text",
text=prompt_text
)
)
]
)
else:
raise ValueError(f"未知提示: {name}")
except Exception as e:
logger.error(f"提示获取错误 {name}: {e}")
raise
主函数
async def main(): “”“启动MCP服务器”“” logger.info(“启动MCP服务器…”)
# 使用stdio传输
async with StdioServerSession(server) as session:
await session.run()
if name == “main”: asyncio.run(main())
#### 4. 安装和运行
```bash
# 安装依赖
pip install -r requirements.txt
# 运行服务器
python src/server.py
TypeScript实现
1. 项目结构
my-first-mcp-server-ts/
├── src/
│ ├── index.ts
│ ├── tools/
│ │ ├── calculator.ts
│ │ ├── time.ts
│ │ └── file.ts
│ ├── resources/
│ │ └── config.ts
│ └── prompts/
│ └── templates.ts
├── package.json
├── tsconfig.json
├── README.md
└── .gitignore
2. 依赖配置
package.json:
{
"name": "my-first-mcp-server-ts",
"version": "0.1.0",
"description": "我的第一个MCP服务器 (TypeScript)",
"main": "dist/index.js",
"scripts": {
"build": "tsc",
"start": "node dist/index.js",
"dev": "ts-node src/index.ts",
"watch": "tsc --watch",
"clean": "rm -rf dist"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0",
"zod": "^3.22.0"
},
"devDependencies": {
"@types/node": "^20.0.0",
"typescript": "^5.0.0",
"ts-node": "^10.9.0"
},
"engines": {
"node": ">=18.0.0"
}
}
tsconfig.json:
{
"compilerOptions": {
"target": "ES2022",
"module": "commonjs",
"lib": ["ES2022"],
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"declaration": true,
"declarationMap": true,
"sourceMap": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
3. 服务器实现
src/index.ts:
#!/usr/bin/env node
/**
* 我的第一个MCP服务器 (TypeScript版本)
* 提供基本的工具和资源功能
*/
import { Server } from '@modelcontextprotocol/sdk/server/index.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import {
CallToolRequestSchema,
ListToolsRequestSchema,
ListResourcesRequestSchema,
ReadResourceRequestSchema,
ListPromptsRequestSchema,
GetPromptRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';
// 工具参数模式定义
const CalculatorArgsSchema = z.object({
expression: z.string().describe('要计算的数学表达式'),
});
const TimeArgsSchema = z.object({
timezone: z.string().default('UTC').describe('时区'),
format: z.string().default('%Y-%m-%d %H:%M:%S').describe('时间格式'),
});
const FileArgsSchema = z.object({
path: z.string().describe('文件路径'),
content: z.string().optional().describe('文件内容(写入时使用)'),
});
// 创建服务器实例
const server = new Server(
{
name: 'my-first-mcp-server-ts',
version: '0.1.0',
},
{
capabilities: {
tools: {},
resources: {},
prompts: {},
},
}
);
// 工具列表处理
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: 'calculator',
description: '执行数学计算',
inputSchema: {
type: 'object',
properties: {
expression: {
type: 'string',
description: '要计算的数学表达式',
},
},
required: ['expression'],
},
},
{
name: 'get_time',
description: '获取当前时间',
inputSchema: {
type: 'object',
properties: {
timezone: {
type: 'string',
description: '时区',
default: 'UTC',
},
format: {
type: 'string',
description: '时间格式',
default: '%Y-%m-%d %H:%M:%S',
},
},
},
},
{
name: 'read_file',
description: '读取文件内容',
inputSchema: {
type: 'object',
properties: {
path: {
type: 'string',
description: '文件路径',
},
},
required: ['path'],
},
},
{
name: 'write_file',
description: '写入文件内容',
inputSchema: {
type: 'object',
properties: {
path: {
type: 'string',
description: '文件路径',
},
content: {
type: 'string',
description: '文件内容',
},
},
required: ['path', 'content'],
},
},
],
};
});
// 工具调用处理
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case 'calculator':
return await handleCalculator(args);
case 'get_time':
return await handleGetTime(args);
case 'read_file':
return await handleReadFile(args);
case 'write_file':
return await handleWriteFile(args);
default:
throw new Error(`未知工具: ${name}`);
}
} catch (error) {
return {
content: [
{
type: 'text',
text: `工具执行失败: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
});
// 计算器工具实现
async function handleCalculator(args: unknown) {
const { expression } = CalculatorArgsSchema.parse(args);
try {
// 安全的数学表达式计算
const allowedOperations = /^[0-9+\-*/().\s]+$/;
if (!allowedOperations.test(expression)) {
throw new Error('表达式包含不允许的字符');
}
const result = Function(`"use strict"; return (${expression})`)();
if (typeof result !== 'number' || !isFinite(result)) {
throw new Error('计算结果无效');
}
return {
content: [
{
type: 'text',
text: `计算结果: ${expression} = ${result}`,
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `计算错误: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
// 时间查询工具实现
async function handleGetTime(args: unknown) {
const { timezone, format } = TimeArgsSchema.parse(args);
try {
const now = new Date();
let timeString: string;
if (timezone === 'UTC') {
timeString = now.toISOString();
} else {
// 简化的时区处理
timeString = now.toLocaleString('zh-CN', {
timeZone: timezone,
year: 'numeric',
month: '2-digit',
day: '2-digit',
hour: '2-digit',
minute: '2-digit',
second: '2-digit',
});
}
return {
content: [
{
type: 'text',
text: `当前时间 (${timezone}): ${timeString}`,
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `时间查询错误: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
// 文件读取工具实现
async function handleReadFile(args: unknown) {
const { path } = FileArgsSchema.parse(args);
try {
const fs = await import('fs/promises');
const content = await fs.readFile(path, 'utf-8');
return {
content: [
{
type: 'text',
text: `文件内容 (${path}):\n\n${content}`,
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `文件读取错误: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
// 文件写入工具实现
async function handleWriteFile(args: unknown) {
const { path, content } = FileArgsSchema.parse(args);
if (!content) {
return {
content: [
{
type: 'text',
text: '写入文件需要提供content参数',
},
],
isError: true,
};
}
try {
const fs = await import('fs/promises');
const pathModule = await import('path');
// 确保目录存在
const dir = pathModule.dirname(path);
await fs.mkdir(dir, { recursive: true });
// 写入文件
await fs.writeFile(path, content, 'utf-8');
return {
content: [
{
type: 'text',
text: `文件写入成功: ${path}\n写入了 ${content.length} 个字符`,
},
],
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `文件写入错误: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
// 资源列表处理
server.setRequestHandler(ListResourcesRequestSchema, async () => {
return {
resources: [
{
uri: 'file://./README.md',
name: '项目说明',
description: '项目的README文档',
mimeType: 'text/markdown',
},
{
uri: 'config://server',
name: '服务器配置',
description: '当前服务器的配置信息',
mimeType: 'application/json',
},
],
};
});
// 资源读取处理
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
switch (uri) {
case 'file://./README.md':
const readmeContent = `
# 我的第一个MCP服务器 (TypeScript)
这是一个简单的MCP服务器示例,使用TypeScript实现。
## 功能特性
### 工具
- **calculator**: 执行数学计算
- **get_time**: 获取当前时间
- **read_file**: 读取文件内容
- **write_file**: 写入文件内容
### 资源
- **README.md**: 项目说明文档
- **server配置**: 服务器配置信息
## 技术栈
- TypeScript
- MCP SDK
- Zod (参数验证)
## 使用方法
1. 安装依赖: \`npm install\`
2. 构建项目: \`npm run build\`
3. 启动服务器: \`npm start\`
`;
return {
contents: [
{
uri,
mimeType: 'text/markdown',
text: readmeContent,
},
],
};
case 'config://server':
const config = {
server_name: 'my-first-mcp-server-ts',
version: '0.1.0',
protocol_version: '2024-11-05',
capabilities: {
tools: true,
resources: true,
prompts: true,
},
startup_time: new Date().toISOString(),
runtime: 'Node.js',
language: 'TypeScript',
};
return {
contents: [
{
uri,
mimeType: 'application/json',
text: JSON.stringify(config, null, 2),
},
],
};
default:
throw new Error(`未知资源: ${uri}`);
}
});
// 提示列表处理
server.setRequestHandler(ListPromptsRequestSchema, async () => {
return {
prompts: [
{
name: 'greeting',
description: '生成问候语',
arguments: [
{
name: 'name',
description: '要问候的人的姓名',
required: true,
},
{
name: 'time_of_day',
description: '时间段(morning/afternoon/evening)',
required: false,
},
],
},
{
name: 'code_review',
description: '代码审查提示',
arguments: [
{
name: 'code',
description: '要审查的代码',
required: true,
},
{
name: 'language',
description: '编程语言',
required: false,
},
],
},
],
};
});
// 提示获取处理
server.setRequestHandler(GetPromptRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case 'greeting':
const userName = args?.name || '朋友';
const timeOfDay = args?.time_of_day || '';
let greeting: string;
if (timeOfDay === 'morning') {
greeting = `早上好,${userName}!希望你今天有个美好的开始。`;
} else if (timeOfDay === 'afternoon') {
greeting = `下午好,${userName}!希望你下午工作顺利。`;
} else if (timeOfDay === 'evening') {
greeting = `晚上好,${userName}!希望你有个愉快的夜晚。`;
} else {
greeting = `你好,${userName}!很高兴见到你。`;
}
return {
description: '个性化问候语',
messages: [
{
role: 'user',
content: {
type: 'text',
text: greeting,
},
},
],
};
case 'code_review':
const code = args?.code || '';
const language = args?.language || '未指定';
const promptText = `
请审查以下${language}代码:
\`\`\`${language.toLowerCase() !== '未指定' ? language.toLowerCase() : ''}
${code}
\`\`\`
请提供:
1. 代码质量评估
2. 潜在问题识别
3. 改进建议
4. 最佳实践推荐
请确保你的反馈具体、可操作,并解释你的建议背后的原因。
`;
return {
description: '代码审查提示',
messages: [
{
role: 'user',
content: {
type: 'text',
text: promptText,
},
},
],
};
default:
throw new Error(`未知提示: ${name}`);
}
});
// 启动服务器
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('MCP服务器已启动');
}
if (require.main === module) {
main().catch((error) => {
console.error('服务器启动失败:', error);
process.exit(1);
});
}
4. 构建和运行
# 安装依赖
npm install
# 构建项目
npm run build
# 运行服务器
npm start
# 或者开发模式
npm run dev
创建第一个MCP客户端
Python客户端示例
client.py:
#!/usr/bin/env python3
"""
MCP客户端示例
连接到MCP服务器并调用工具
"""
import asyncio
import json
from mcp import ClientSession, StdioClientTransport
async def main():
"""主函数"""
# 创建传输层(连接到服务器)
transport = StdioClientTransport(
command="python",
args=["src/server.py"],
cwd="../my-first-mcp-server"
)
# 创建客户端会话
async with ClientSession(transport) as session:
# 初始化连接
await session.initialize()
print("=== MCP客户端示例 ===")
# 1. 获取工具列表
print("\n1. 获取工具列表:")
tools_result = await session.list_tools()
for tool in tools_result.tools:
print(f" - {tool.name}: {tool.description}")
# 2. 调用计算器工具
print("\n2. 调用计算器工具:")
calc_result = await session.call_tool(
"calculator",
{"expression": "2 + 3 * 4"}
)
for content in calc_result.content:
if content.type == "text":
print(f" 结果: {content.text}")
# 3. 获取当前时间
print("\n3. 获取当前时间:")
time_result = await session.call_tool(
"get_time",
{"timezone": "Asia/Shanghai", "format": "%Y-%m-%d %H:%M:%S"}
)
for content in time_result.content:
if content.type == "text":
print(f" 时间: {content.text}")
# 4. 获取资源列表
print("\n4. 获取资源列表:")
resources_result = await session.list_resources()
for resource in resources_result.resources:
print(f" - {resource.name}: {resource.description}")
# 5. 读取资源
print("\n5. 读取服务器配置:")
config_result = await session.read_resource("config://server")
for content in config_result.contents:
if hasattr(content, 'text'):
config_data = json.loads(content.text)
print(f" 服务器名称: {config_data['server_name']}")
print(f" 版本: {config_data['version']}")
print(f" 启动时间: {config_data['startup_time']}")
# 6. 获取提示列表
print("\n6. 获取提示列表:")
prompts_result = await session.list_prompts()
for prompt in prompts_result.prompts:
print(f" - {prompt.name}: {prompt.description}")
# 7. 使用提示
print("\n7. 使用问候提示:")
greeting_result = await session.get_prompt(
"greeting",
{"name": "张三", "time_of_day": "morning"}
)
for message in greeting_result.messages:
if hasattr(message.content, 'text'):
print(f" 问候语: {message.content.text}")
if __name__ == "__main__":
asyncio.run(main())
TypeScript客户端示例
client.ts:
#!/usr/bin/env node
/**
* MCP客户端示例 (TypeScript)
* 连接到MCP服务器并调用工具
*/
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
async function main() {
// 创建传输层(连接到服务器)
const transport = new StdioClientTransport({
command: 'node',
args: ['dist/index.js'],
cwd: '../my-first-mcp-server-ts',
});
// 创建客户端
const client = new Client(
{
name: 'mcp-client-example',
version: '1.0.0',
},
{
capabilities: {},
}
);
try {
// 连接到服务器
await client.connect(transport);
console.log('=== MCP客户端示例 (TypeScript) ===');
// 1. 获取工具列表
console.log('\n1. 获取工具列表:');
const toolsResult = await client.request(
{ method: 'tools/list' },
{ method: 'tools/list' }
);
if ('tools' in toolsResult) {
for (const tool of toolsResult.tools) {
console.log(` - ${tool.name}: ${tool.description}`);
}
}
// 2. 调用计算器工具
console.log('\n2. 调用计算器工具:');
const calcResult = await client.request(
{
method: 'tools/call',
params: {
name: 'calculator',
arguments: { expression: '10 * 5 + 2' },
},
},
{ method: 'tools/call' }
);
if ('content' in calcResult) {
for (const content of calcResult.content) {
if (content.type === 'text') {
console.log(` 结果: ${content.text}`);
}
}
}
// 3. 获取当前时间
console.log('\n3. 获取当前时间:');
const timeResult = await client.request(
{
method: 'tools/call',
params: {
name: 'get_time',
arguments: { timezone: 'Asia/Shanghai' },
},
},
{ method: 'tools/call' }
);
if ('content' in timeResult) {
for (const content of timeResult.content) {
if (content.type === 'text') {
console.log(` 时间: ${content.text}`);
}
}
}
// 4. 获取资源列表
console.log('\n4. 获取资源列表:');
const resourcesResult = await client.request(
{ method: 'resources/list' },
{ method: 'resources/list' }
);
if ('resources' in resourcesResult) {
for (const resource of resourcesResult.resources) {
console.log(` - ${resource.name}: ${resource.description}`);
}
}
// 5. 读取资源
console.log('\n5. 读取服务器配置:');
const configResult = await client.request(
{
method: 'resources/read',
params: { uri: 'config://server' },
},
{ method: 'resources/read' }
);
if ('contents' in configResult) {
for (const content of configResult.contents) {
if ('text' in content) {
const configData = JSON.parse(content.text);
console.log(` 服务器名称: ${configData.server_name}`);
console.log(` 版本: ${configData.version}`);
console.log(` 启动时间: ${configData.startup_time}`);
}
}
}
console.log('\n客户端示例执行完成!');
} catch (error) {
console.error('客户端错误:', error);
} finally {
await client.close();
}
}
if (require.main === module) {
main().catch((error) => {
console.error('客户端启动失败:', error);
process.exit(1);
});
}
测试和调试
1. 使用MCP调试工具
# 安装调试工具
npm install -g @modelcontextprotocol/inspector
# 启动调试界面
mcp-inspector
2. 手动测试
测试脚本 (test.py):
#!/usr/bin/env python3
import asyncio
import json
from mcp import ClientSession, StdioClientTransport
async def test_server():
"""测试MCP服务器功能"""
transport = StdioClientTransport(
command="python",
args=["src/server.py"]
)
async with ClientSession(transport) as session:
await session.initialize()
# 测试用例
test_cases = [
{
"name": "计算器测试",
"tool": "calculator",
"args": {"expression": "2 + 2"}
},
{
"name": "时间查询测试",
"tool": "get_time",
"args": {"timezone": "UTC"}
},
{
"name": "文件写入测试",
"tool": "write_file",
"args": {
"path": "./test.txt",
"content": "Hello, MCP!"
}
},
{
"name": "文件读取测试",
"tool": "read_file",
"args": {"path": "./test.txt"}
}
]
for test_case in test_cases:
print(f"\n执行测试: {test_case['name']}")
try:
result = await session.call_tool(
test_case['tool'],
test_case['args']
)
if result.isError:
print(f" ❌ 测试失败")
else:
print(f" ✅ 测试通过")
for content in result.content:
if content.type == "text":
print(f" 输出: {content.text}")
except Exception as e:
print(f" ❌ 测试异常: {e}")
if __name__ == "__main__":
asyncio.run(test_server())
3. 单元测试
test_tools.py:
import pytest
import asyncio
from src.server import (
handle_calculator,
handle_get_time,
handle_read_file,
handle_write_file
)
class TestTools:
"""工具测试类"""
@pytest.mark.asyncio
async def test_calculator_basic(self):
"""测试基本计算"""
result = await handle_calculator({"expression": "2 + 2"})
assert not result.isError
assert "4" in result.content[0].text
@pytest.mark.asyncio
async def test_calculator_invalid(self):
"""测试无效表达式"""
result = await handle_calculator({"expression": "import os"})
assert result.isError
@pytest.mark.asyncio
async def test_get_time(self):
"""测试时间查询"""
result = await handle_get_time({"timezone": "UTC"})
assert not result.isError
assert "UTC" in result.content[0].text
@pytest.mark.asyncio
async def test_file_operations(self, tmp_path):
"""测试文件操作"""
test_file = tmp_path / "test.txt"
test_content = "Hello, World!"
# 测试写入
write_result = await handle_write_file({
"path": str(test_file),
"content": test_content
})
assert not write_result.isError
# 测试读取
read_result = await handle_read_file({
"path": str(test_file)
})
assert not read_result.isError
assert test_content in read_result.content[0].text
部署和分发
1. Python包分发
setup.py:
from setuptools import setup, find_packages
setup(
name="my-first-mcp-server",
version="0.1.0",
description="我的第一个MCP服务器",
author="Your Name",
author_email="your.email@example.com",
packages=find_packages(),
install_requires=[
"mcp>=1.0.0",
"fastapi>=0.104.0",
"uvicorn>=0.24.0",
],
entry_points={
"console_scripts": [
"my-mcp-server=src.server:main",
],
},
python_requires=">=3.8",
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
)
构建和发布:
# 构建包
python setup.py sdist bdist_wheel
# 发布到PyPI(测试)
twine upload --repository testpypi dist/*
# 发布到PyPI(正式)
twine upload dist/*
2. npm包分发
发布TypeScript包:
# 构建项目
npm run build
# 发布到npm(测试)
npm publish --registry https://registry.npmjs.org/ --tag beta
# 发布到npm(正式)
npm publish
3. Docker部署
Dockerfile:
# Python版本
FROM python:3.11-slim
WORKDIR /app
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制源码
COPY src/ ./src/
# 设置入口点
CMD ["python", "src/server.py"]
docker-compose.yml:
version: '3.8'
services:
mcp-server:
build: .
container_name: my-mcp-server
restart: unless-stopped
environment:
- LOG_LEVEL=INFO
volumes:
- ./data:/app/data
networks:
- mcp-network
networks:
mcp-network:
driver: bridge
常见问题和解决方案
1. 连接问题
问题:客户端无法连接到服务器
解决方案:
# 检查服务器是否正常启动
import subprocess
import sys
def check_server_health():
"""检查服务器健康状态"""
try:
result = subprocess.run(
[sys.executable, "src/server.py", "--health-check"],
capture_output=True,
text=True,
timeout=10
)
return result.returncode == 0
except subprocess.TimeoutExpired:
return False
except Exception:
return False
if not check_server_health():
print("服务器启动失败,请检查配置")
2. 性能问题
问题:工具调用响应缓慢
解决方案:
# 添加缓存机制
from functools import lru_cache
import asyncio
class ToolCache:
"""工具结果缓存"""
def __init__(self, max_size=100, ttl=300):
self.cache = {}
self.max_size = max_size
self.ttl = ttl
async def get_or_compute(self, key, compute_func, *args, **kwargs):
"""获取缓存或计算新值"""
import time
now = time.time()
# 检查缓存
if key in self.cache:
value, timestamp = self.cache[key]
if now - timestamp < self.ttl:
return value
# 计算新值
result = await compute_func(*args, **kwargs)
# 更新缓存
if len(self.cache) >= self.max_size:
# 删除最旧的条目
oldest_key = min(self.cache.keys(),
key=lambda k: self.cache[k][1])
del self.cache[oldest_key]
self.cache[key] = (result, now)
return result
# 使用缓存
tool_cache = ToolCache()
@server.call_tool()
async def handle_call_tool_cached(name: str, arguments: Dict[str, Any]):
"""带缓存的工具调用"""
cache_key = f"{name}:{hash(str(arguments))}"
return await tool_cache.get_or_compute(
cache_key,
handle_call_tool_original,
name,
arguments
)
3. 错误处理
问题:工具执行时出现未处理的异常
解决方案:
# 全局异常处理器
import traceback
from functools import wraps
def error_handler(func):
"""错误处理装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"工具执行错误: {e}")
logger.error(f"堆栈跟踪: {traceback.format_exc()}")
return CallToolResult(
content=[
TextContent(
type="text",
text=f"工具执行失败: {str(e)}\n请检查参数是否正确"
)
],
isError=True
)
return wrapper
# 应用到所有工具函数
@error_handler
async def handle_calculator(arguments: Dict[str, Any]):
# 工具实现...
pass
本章总结
在本章中,我们学习了:
核心内容
- 开发环境搭建:安装Node.js、Python、Git等基础工具
- MCP工具安装:CLI工具、Python SDK、TypeScript SDK
- 代码编辑器配置:VS Code和JetBrains IDEs的配置
- 第一个MCP服务器:Python和TypeScript两种实现
- 第一个MCP客户端:连接和调用服务器功能
- 测试和调试:单元测试、集成测试、调试工具
- 部署和分发:包管理、Docker部署
- 问题解决:常见问题的诊断和解决方案
关键技能
- 环境配置和工具安装
- MCP服务器开发(工具、资源、提示)
- MCP客户端开发和集成
- 错误处理和性能优化
- 测试驱动开发
- 部署和运维
最佳实践
- 使用虚拟环境隔离依赖
- 编写完整的测试用例
- 实现适当的错误处理
- 添加日志和监控
- 遵循安全最佳实践
- 编写清晰的文档
下一步学习
在下一章中,我们将深入学习工具开发与管理,包括: - 高级工具设计模式 - 工具参数验证和类型安全 - 工具组合和链式调用 - 工具性能优化 - 工具安全性考虑 - 工具版本管理和兼容性
通过本章的学习,你已经具备了MCP开发的基础技能,可以开始构建自己的MCP应用了!