3.5 MCP服务¶
学习目标¶
- 掌握天气MCP服务器
- 掌握票务MCP服务器
- 掌握订票MCP服务器
一、天气MCP服务器¶
mcp_weather_server天气 MCP 服务器,提供 weather_data 表的 SELECT 查询接口,返回 JSON 格式结果。
核心功能:
-
初始化 MySQL 数据库连接。
-
执行 SELECT 查询,返回 JSON 格式结果。
-
格式化日期和数值字段,确保 JSON 序列化兼容。
-
通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。
1 格式编码¶
format.py中包含一个编码器方法和JSON编码器类。
目标:定义编码器方法,用于格式化单个对象;自定义 JSON 编码器,处理 MySQL 查询结果中的非标准类型。
功能:将 MySQL查询结果中的date、datetime、timedelta 和 Decimal 类型转换为 JSON 兼容的字符串或数值。
位置:SmartVoyage/utils/format.py
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
def default_encoder(obj): # 定义编码器方法,用于格式化单个对象
if isinstance(obj, datetime): # 检查是否为datetime,返回带时间的格式化字符串
return obj.strftime('%Y-%m-%d %H:%M:%S')
if isinstance(obj, date): # 检查是否为date,返回日期格式化字符串
return obj.strftime('%Y-%m-%d')
if isinstance(obj, timedelta): # 检查是否为timedelta,转换为字符串
return str(obj)
if isinstance(obj, Decimal): # 检查是否为Decimal,转换为浮点数
return float(obj)
return obj # 否则返回原对象
# 定义自定义JSON编码器类,继承自json.JSONEncoder,用于处理非标准类型序列化
class DateEncoder(json.JSONEncoder):
def default(self, obj): # 重写default方法,处理序列化时的默认对象转换
if isinstance(obj, (date, datetime)): # 检查对象是否为date或datetime类型,对于datetime返回带时间的字符串,对于date返回日期字符串
return obj.strftime('%Y-%m-%d %H:%M:%S') if isinstance(obj, datetime) else obj.strftime('%Y-%m-%d')
if isinstance(obj, timedelta): # 检查对象是否为timedelta类型,将时间差转换为字符串
return str(obj)
if isinstance(obj, Decimal): # 检查对象是否为Decimal类型,将Decimal转换为浮点数以兼容JSON
return float(obj)
return super().default(obj) # 对于其他类型,调用父类默认方法
测试:
if __name__ == '__main__':
print(default_encoder(datetime(2025, 8, 11, 8, 0)))
print(default_encoder(date(2025, 8, 11)))
print(default_encoder(timedelta(days=1)))
print(default_encoder(Decimal('123.45')))
print('*'*80)
encoder = DateEncoder()
print(encoder.default(datetime(2025, 8, 11, 8, 0)))
print(encoder.default(date(2025, 8, 11)))
print(encoder.default(timedelta(days=1)))
print(encoder.default(Decimal('123.45')))
2 WeatherService类¶
目标:提供天气数据查询服务,响应代理的 SQL 请求。
功能:初始化 MySQL 连接,执行 SELECT 查询,格式化结果为 JSON。
位置:SmartVoyage/mcp_server/mcp_weather_server.py
import mysql.connector
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
from mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
from SmartVoyage.utils.format import DateEncoder, default_encoder
conf = Config()
# 天气服务类
class WeatherService: # 定义天气服务类,封装数据库操作逻辑
def __init__(self):
# 连接数据库
self.conn = mysql.connector.connect(
host=conf.host,
user=conf.user,
password=conf.password,
database=conf.database
)
# 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串
def execute_query(self, sql: str) -> str:
try:
cursor = self.conn.cursor(dictionary=True)
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
# 格式化结果
for result in results: # 遍历每个结果字典
for key, value in result.items():
if isinstance(value, (date, datetime, timedelta, Decimal)): # 检查值是否为特殊类型
result[key] = default_encoder(value) # 使用自定义编码器格式化该值
# 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义
return json.dumps({"status": "success", "data": results} if results else {"status": "no_data", "message": "未找到天气数据,请确认城市和日期。"}, cls=DateEncoder, ensure_ascii=False)
except Exception as e:
logger.error(f"天气查询错误: {str(e)}")
# 返回错误JSON响应
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)
测试:
if __name__ == "__main__":
service = WeatherService()
sql = "SELECT * FROM weather_data WHERE city='上海' limit 2"
print(service.execute_query(sql))
3 启动MCP服务器¶
create_weather_mcp_server()函数
目标:创建并启动天气 MCP 服务器。
功能:初始化 FastMCP,注册 query_weather 工具,启动 FastAPI 服务器,监听端口 6001。
# 创建天气MCP服务器
def create_weather_mcp_server():
# 创建FastMCP实例
weather_mcp = FastMCP(name="WeatherTools",
instructions="天气查询工具,基于 weather_data 表。",
log_level="ERROR",
host="127.0.0.1", port=8002)
# 实例化天气服务对象
service = WeatherService()
@weather_mcp.tool(
name="query_weather",
description="查询天气数据,输入 SQL,如 'SELECT * FROM weather_data WHERE city = \"北京\" AND fx_date = \"2025-07-30\"'"
)
def query_weather(sql: str) -> str:
logger.info(f"执行天气查询: {sql}")
return service.execute_query(sql)
# 打印服务器信息
logger.info("=== 天气MCP服务器信息 ===")
logger.info(f"名称: {weather_mcp.name}")
logger.info(f"描述: {weather_mcp.instructions}")
# 运行服务器
try:
print("服务器已启动,请访问 http://127.0.0.1:8002/mcp")
weather_mcp.run(transport="streamable-http") # 使用 streamable-http 传输方式
except Exception as e:
print(f"服务器启动失败: {e}")
客户端测试:
位置:SmartVoyage/test/test_weather_mcp_server.py
import asyncio
import json
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
# 定义服务器地址
server_url = "http://127.0.0.1:8002/mcp"
async def test_weather_mcp():
try:
# 启动 MCP server,通过streamable建立连接
async with streamablehttp_client(server_url) as (read, write, _):
# 使用读写通道创建 MCP 会话
async with ClientSession(read, write) as session:
try:
await session.initialize()
print("会话初始化成功,可以开始调用工具。")
# 从 session 自动获取 MCP server 提供的工具列表。
tools = await load_mcp_tools(session)
print(f"tools-->{tools}")
# 测试1: 查询指定日期天气
sql = "SELECT * FROM weather_data WHERE city = '北京' AND fx_date = '2025-10-28'"
result = await session.call_tool("query_weather", {"sql": sql})
result_data = json.loads(result) if isinstance(result, str) else result
print(f"指定日期天气结果:{result_data}")
# 测试2: 查询未来3天天气
sql_range = "SELECT * FROM weather_data WHERE city = '北京' AND fx_date BETWEEN '2025-10-28' AND '2025-10-30'"
result_range = await session.call_tool("query_weather", {"sql": sql_range})
result_range_data = json.loads(result_range) if isinstance(result_range, str) else result_range
print(f"天气范围查询结果:{result_range_data}")
except Exception as e:
print(f"天气 MCP 测试出错:{str(e)}")
except Exception as e:
print(f"连接或会话初始化时发生错误: {e}")
print("请确认服务端脚本已启动并运行在 http://127.0.0.1:8002/mcp")
if __name__ == "__main__":
asyncio.run(test_weather_mcp())
二、票务MCP服务器¶
mcp_ticket_server.py:票务 MCP 服务器,提供 train_tickets、flight_tickets 和 concert_tickets 表的 SELECT 查询接口,返回 JSON 格式结果。
核心功能:
- 初始化 MySQL 数据库连接。
- 执行 SELECT 查询,返回 JSON 格式结果。
- 格式化日期和数值字段,确保 JSON 序列化兼容。
- 通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。
1 TicketService类¶
目标:提供票务数据查询服务,响应代理的 SQL 请求。
功能:初始化 MySQL 连接,执行 SELECT 查询,格式化结果为 JSON。
位置:SmartVoyage/mcp_server/mcp_ticket_server.py
import mysql.connector
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
from mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
from SmartVoyage.utils.format import DateEncoder, default_encoder
conf = Config()
# 票务服务类
class TicketService: # 定义票务服务类,封装数据库操作逻辑
def __init__(self): # 初始化方法,建立数据库连接
# 连接数据库
self.conn = mysql.connector.connect(
host=conf.host,
user=conf.user,
password=conf.password,
database=conf.database
)
# 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串
def execute_query(self, sql: str) -> str:
try:
cursor = self.conn.cursor(dictionary=True)
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
# 格式化结果
for result in results: # 遍历每个结果字典
for key, value in result.items():
if isinstance(value, (date, datetime, timedelta, Decimal)): # 检查值是否为特殊类型
result[key] = default_encoder(value) # 使用自定义编码器格式化该值
# 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义
return json.dumps({"status": "success", "data": results} if results else {"status": "no_data",
"message": "未找到票务数据,请确认查询条件。"},
cls=DateEncoder, ensure_ascii=False)
except Exception as e:
logger.error(f"票务查询错误: {str(e)}")
# 返回错误JSON响应
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)
测试:
if __name__ == "__main__":
service = TicketService()
sql = "SELECT * FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '北京' AND DATE(departure_time) = '2025-10-28' AND cabin_type = '公务舱'"
print(service.execute_query(sql))
2 启动MCP 服务器¶
create_ticket_mcp_server()函数
目标:创建并启动票务 MCP 服务器。
功能:初始化 FastMCP,注册 query_tickets 工具,启动 FastAPI 服务器,监听端口 6002。
# 创建票务MCP服务器
def create_ticket_mcp_server():
# 创建FastMCP实例
ticket_mcp = FastMCP(name="TicketTools",
instructions="票务查询工具,基于 train_tickets, flight_tickets, concert_tickets 表。只支持查询。",
log_level="ERROR",
host="127.0.0.1", port=8001)
# 实例化票务服务对象
service = TicketService()
@ticket_mcp.tool(
name="query_tickets",
description="查询票务数据,输入 SQL,如 'SELECT * FROM train_tickets WHERE departure_city = \"北京\" AND arrival_city = \"上海\"'"
)
def query_tickets(sql: str) -> str:
logger.info(f"执行票务查询: {sql}")
return service.execute_query(sql)
# 打印服务器信息
logger.info("=== 票务MCP服务器信息 ===")
logger.info(f"名称: {ticket_mcp.name}")
logger.info(f"描述: {ticket_mcp.instructions}")
# 运行服务器
try:
print("服务器已启动,请访问 http://127.0.0.1:8001/mcp")
ticket_mcp.run(transport="streamable-http") # 使用 streamable-http 传输方式
except Exception as e:
print(f"服务器启动失败: {e}")
客户端测试:
位置:SmartVoyage/test/test_ticket_mcp_server.py
import asyncio
import json
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
# 定义服务器地址
server_url = "http://127.0.0.1:8001/mcp"
async def test_ticket_mcp():
try:
# 启动 MCP server,通过streamable建立连接
async with streamablehttp_client(server_url) as (read, write, _):
# 使用读写通道创建 MCP 会话
async with ClientSession(read, write) as session:
try:
await session.initialize()
print("会话初始化成功,可以开始调用工具。")
# 从 session 自动获取 MCP server 提供的工具列表。
tools = await load_mcp_tools(session)
print(f"tools-->{tools}")
# 调用远程工具
# 测试1: 查询机票
sql_flights = "SELECT * FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '北京' AND DATE(departure_time) = '2025-10-28' AND cabin_type = '公务舱'"
result_flights = await session.call_tool("query_tickets", {"sql": sql_flights})
result_flights_data = json.loads(result_flights) if isinstance(result_flights, str) else result_flights
print(f"机票查询结果:{result_flights_data}")
# 测试2: 查询火车票
sql_trains = "SELECT * FROM train_tickets WHERE departure_city = '北京' AND arrival_city = '上海' AND DATE(departure_time) = '2025-10-22' AND seat_type = '二等座'"
result_trains = await session.call_tool("query_tickets", {"sql": sql_trains})
result_trains_data = json.loads(result_trains) if isinstance(result_trains, str) else result_trains
print(f"火车票查询结果:{result_trains_data}")
# 测试3: 查询演唱会票
sql_concerts = "SELECT * FROM concert_tickets WHERE city = '北京' AND artist = '刀郎' AND DATE(start_time) = '2025-10-31' AND ticket_type = '看台'"
result_concerts = await session.call_tool("query_tickets", {"sql": sql_concerts})
result_concerts_data = json.loads(result_concerts) if isinstance(result_concerts, str) else result_concerts
print(f"演唱会票查询结果:{result_concerts_data}")
except Exception as e:
print(f"票务 MCP 测试出错:{str(e)}")
except Exception as e:
print(f"连接或会话初始化时发生错误: {e}")
print("请确认服务端脚本已启动并运行在 http://127.0.0.1:8001/mcp")
if __name__ == "__main__":
asyncio.run(test_ticket_mcp())
三、订票MCP服务器¶
mcp_order_server.py:订票 MCP 服务器,通过调用API完成火车票、飞机票和演唱会票的预定。
核心功能:
- 火车票预定、飞机票预定、演出票预定。
- 通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。
位置:SmartVoyage/mcp_server/mcp_order_server.py
from mcp.server.fastmcp import FastMCP
from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
conf = Config()
# 创建FastMCP实例
order_mcp = FastMCP(name="OrderTools",
instructions="票务预定工具,通过调用API完成火车票、飞机票和演唱会票的预定。",
log_level="ERROR",
host="127.0.0.1", port=8003)
@order_mcp.tool(
name="order_train",
description="根据时间、车次、座位类型、数量预定火车票"
)
def order_train(departure_date: str, train_number: str, seat_type: str, number: int) -> str:
'''
Args:
departure_date (str): 出发日期,如 '2025-10-30'
train_number (str): 火车车次,如 'G346'
seat_type (str): 座位类型,如 '二等座'
number (int): 订购张数
'''
logger.info(f"正在订购火车票: {departure_date}, {train_number}, {seat_type}, {number}")
logger.info(f"恭喜,火车票预定成功!")
return "恭喜,火车票预定成功!"
@order_mcp.tool(
name="order_flight",
description="根据时间、班次、座位类型、数量预定飞机票"
)
def order_flight(departure_date: str, flight_number: str, seat_type: str, number: int) -> str:
'''
Args:
departure_date (str): 出发日期,如 '2025-10-30'
flight_number (str): 飞机班次,如 'CA6557'
seat_type (str): 座位类型,如 '经济舱'
number (int): 订购张数
'''
logger.info(f"正在订购飞机票: {departure_date}, {flight_number}, {seat_type}, {number}")
logger.info(f"恭喜,飞机票预定成功!")
return "恭喜,飞机票预定成功!"
@order_mcp.tool(
name="order_concert",
description="根据时间、明星、场地、座位类型、数量预定演出票"
)
def order_concert(start_date: str, aritist: str, venue: str, seat_type: str, number: int) -> str:
'''
Args:
start_date (str): 开始日期,如 '2025-10-30'
aritist (str): 明星,如 '刀郎'
venue (str): 场地,如 '上海体育馆'
seat_type (str): 座位类型,如 '看台'
number (int): 订购张数
'''
logger.info(f"正在订购演出票: {start_date}, {aritist}, {venue}, {seat_type}, {number}")
logger.info(f"恭喜,演出票预定成功!")
return "恭喜,演出票预定成功!"
# 创建票务预定MCP服务器
def create_order_mcp_server():
# 打印服务器信息
logger.info("=== 票务预定MCP服务器信息 ===")
logger.info(f"名称: {order_mcp.name}")
logger.info(f"描述: {order_mcp.instructions}")
# 运行服务器
try:
print("服务器已启动,请访问 http://127.0.0.1:8003/mcp")
order_mcp.run(transport="streamable-http") # 使用 streamable-http 传输方式
except Exception as e:
print(f"服务器启动失败: {e}")
if __name__ == "__main__":
# 调用创建服务器函数
create_order_mcp_server()
客户端测试:
位置:SmartVoyage/test/test_order_mcp_server.py
import asyncio
import json
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
conf = Config()
# 初始化LLM
llm = ChatOpenAI(
model=conf.model_name,
base_url=conf.base_url,
api_key=conf.api_key,
temperature=0.1
)
async def order_tickets(query):
try:
# 启动 MCP server,通过streamable建立连接
async with streamablehttp_client("http://127.0.0.1:8003/mcp") as (read, write, _):
# 使用读写通道创建 MCP 会话
async with ClientSession(read, write) as session:
try:
await session.initialize()
# 从 session 自动获取 MCP server 提供的工具列表。
tools = await load_mcp_tools(session)
# print(f"tools-->{tools}")
# 创建 agent 的提示模板
prompt = ChatPromptTemplate.from_messages([
("system",
"你是一个票务预定助手,能够调用工具来完成火车票、飞机票或演出票的预定。你需要仔细分析工具需要的参数,然后从用户提供的信息中提取信息。如果用户提供的信息不足以提取到调用工具所有必要参数,则向用户追问,以获取该信息。不能自己编撰参数。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
# 构建工具调用代理
agent = create_tool_calling_agent(llm, tools, prompt)
# 创建代理执行器
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 代理调用
response = await agent_executor.ainvoke({"input": query})
return response['output']
except Exception as e:
logger.info(f"票务 MCP 测试出错:{str(e)}")
return f"票务 MCP 查询出错:{str(e)}"
except Exception as e:
logger.error(f"连接或会话初始化时发生错误: {e}")
return "连接或会话初始化时发生错误"
if __name__ == "__main__":
while True:
query = input("请输入查询:")
if query == "exit":
break
print(asyncio.run(order_tickets(query)))
本节小结¶
本节主要描述了smartVoyage项目用到的所有MCP服务。