跳转至

3.5 MCP服务

学习目标

  • 掌握天气MCP服务器
  • 掌握票务MCP服务器
  • 掌握订票MCP服务器

一、天气MCP服务器

mcp_weather_server天气 MCP 服务器,提供 weather_data 表的 SELECT 查询接口,返回 JSON 格式结果。

核心功能

  • 初始化 MySQL 数据库连接。

  • 执行 SELECT 查询,返回 JSON 格式结果。

  • 格式化日期和数值字段,确保 JSON 序列化兼容。

  • 通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。

1 格式编码

format.py中包含一个编码器方法和JSON编码器类。

目标:定义编码器方法,用于格式化单个对象;自定义 JSON 编码器,处理 MySQL 查询结果中的非标准类型。

功能:将 MySQL查询结果中的date、datetime、timedelta 和 Decimal 类型转换为 JSON 兼容的字符串或数值。

位置:SmartVoyage/utils/format.py

import json
from datetime import date, datetime, timedelta
from decimal import Decimal


def default_encoder(obj):  # 定义编码器方法,用于格式化单个对象
    if isinstance(obj, datetime):  # 检查是否为datetime,返回带时间的格式化字符串
        return obj.strftime('%Y-%m-%d %H:%M:%S')
    if isinstance(obj, date):  # 检查是否为date,返回日期格式化字符串
        return obj.strftime('%Y-%m-%d')
    if isinstance(obj, timedelta):  # 检查是否为timedelta,转换为字符串
        return str(obj)
    if isinstance(obj, Decimal):  # 检查是否为Decimal,转换为浮点数
        return float(obj)
    return obj  # 否则返回原对象

# 定义自定义JSON编码器类,继承自json.JSONEncoder,用于处理非标准类型序列化
class DateEncoder(json.JSONEncoder):
    def default(self, obj):  # 重写default方法,处理序列化时的默认对象转换
        if isinstance(obj, (date, datetime)):  # 检查对象是否为date或datetime类型,对于datetime返回带时间的字符串,对于date返回日期字符串
            return obj.strftime('%Y-%m-%d %H:%M:%S') if isinstance(obj, datetime) else obj.strftime('%Y-%m-%d')
        if isinstance(obj, timedelta):  # 检查对象是否为timedelta类型,将时间差转换为字符串
            return str(obj)
        if isinstance(obj, Decimal):  # 检查对象是否为Decimal类型,将Decimal转换为浮点数以兼容JSON
            return float(obj)
        return super().default(obj)  # 对于其他类型,调用父类默认方法

测试

if __name__ == '__main__':
    print(default_encoder(datetime(2025, 8, 11, 8, 0)))
    print(default_encoder(date(2025, 8, 11)))
    print(default_encoder(timedelta(days=1)))
    print(default_encoder(Decimal('123.45')))
    print('*'*80)

    encoder = DateEncoder()
    print(encoder.default(datetime(2025, 8, 11, 8, 0)))
    print(encoder.default(date(2025, 8, 11)))
    print(encoder.default(timedelta(days=1)))
    print(encoder.default(Decimal('123.45')))

2 WeatherService类

目标:提供天气数据查询服务,响应代理的 SQL 请求。

功能:初始化 MySQL 连接,执行 SELECT 查询,格式化结果为 JSON。

位置:SmartVoyage/mcp_server/mcp_weather_server.py

import mysql.connector
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
from mcp.server.fastmcp import FastMCP

from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
from SmartVoyage.utils.format import DateEncoder, default_encoder

conf = Config()

# 天气服务类
class WeatherService:  # 定义天气服务类,封装数据库操作逻辑
    def __init__(self):
        # 连接数据库
        self.conn = mysql.connector.connect(
            host=conf.host,
            user=conf.user,
            password=conf.password,
            database=conf.database
        )

    # 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串
    def execute_query(self, sql: str) -> str:
        try:
            cursor = self.conn.cursor(dictionary=True)
            cursor.execute(sql)
            results = cursor.fetchall()
            cursor.close()
            # 格式化结果
            for result in results:  # 遍历每个结果字典
                for key, value in result.items():
                    if isinstance(value, (date, datetime, timedelta, Decimal)):  # 检查值是否为特殊类型
                        result[key] = default_encoder(value)  # 使用自定义编码器格式化该值
            # 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义
            return json.dumps({"status": "success", "data": results} if results else {"status": "no_data", "message": "未找到天气数据,请确认城市和日期。"}, cls=DateEncoder, ensure_ascii=False)
        except Exception as e:
            logger.error(f"天气查询错误: {str(e)}")
            # 返回错误JSON响应
            return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)

测试

if __name__ == "__main__":
    service = WeatherService()
    sql = "SELECT * FROM weather_data WHERE city='上海' limit 2"
    print(service.execute_query(sql))

3 启动MCP服务器

create_weather_mcp_server()函数

目标:创建并启动天气 MCP 服务器。

功能:初始化 FastMCP,注册 query_weather 工具,启动 FastAPI 服务器,监听端口 6001。

# 创建天气MCP服务器
def create_weather_mcp_server():
    # 创建FastMCP实例
    weather_mcp = FastMCP(name="WeatherTools",
                         instructions="天气查询工具,基于 weather_data 表。",
                         log_level="ERROR",
                         host="127.0.0.1", port=8002)

    # 实例化天气服务对象
    service = WeatherService()

    @weather_mcp.tool(
        name="query_weather",
        description="查询天气数据,输入 SQL,如 'SELECT * FROM weather_data WHERE city = \"北京\" AND fx_date = \"2025-07-30\"'"
    )
    def query_weather(sql: str) -> str:
        logger.info(f"执行天气查询: {sql}")
        return service.execute_query(sql)

    # 打印服务器信息
    logger.info("=== 天气MCP服务器信息 ===")
    logger.info(f"名称: {weather_mcp.name}")
    logger.info(f"描述: {weather_mcp.instructions}")

    # 运行服务器
    try:
        print("服务器已启动,请访问 http://127.0.0.1:8002/mcp")
        weather_mcp.run(transport="streamable-http")  # 使用 streamable-http 传输方式
    except Exception as e:
        print(f"服务器启动失败: {e}")

客户端测试

位置:SmartVoyage/test/test_weather_mcp_server.py

import asyncio
import json

from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

# 定义服务器地址
server_url = "http://127.0.0.1:8002/mcp"

async def test_weather_mcp():
    try:
        # 启动 MCP server,通过streamable建立连接
        async with streamablehttp_client(server_url) as (read, write, _):
            # 使用读写通道创建 MCP 会话
            async with ClientSession(read, write) as session:
                try:
                    await session.initialize()
                    print("会话初始化成功,可以开始调用工具。")

                    # 从 session 自动获取 MCP server 提供的工具列表。
                    tools = await load_mcp_tools(session)
                    print(f"tools-->{tools}")

                    # 测试1: 查询指定日期天气
                    sql = "SELECT * FROM weather_data WHERE city = '北京' AND fx_date = '2025-10-28'"
                    result = await session.call_tool("query_weather", {"sql": sql})
                    result_data = json.loads(result) if isinstance(result, str) else result
                    print(f"指定日期天气结果:{result_data}")

                    # 测试2: 查询未来3天天气
                    sql_range = "SELECT * FROM weather_data WHERE city = '北京' AND fx_date BETWEEN '2025-10-28' AND '2025-10-30'"
                    result_range = await session.call_tool("query_weather", {"sql": sql_range})
                    result_range_data = json.loads(result_range) if isinstance(result_range, str) else result_range
                    print(f"天气范围查询结果:{result_range_data}")
                except Exception as e:
                    print(f"天气 MCP 测试出错:{str(e)}")
    except Exception as e:
        print(f"连接或会话初始化时发生错误: {e}")
        print("请确认服务端脚本已启动并运行在 http://127.0.0.1:8002/mcp")


if __name__ == "__main__":
    asyncio.run(test_weather_mcp())

二、票务MCP服务器

mcp_ticket_server.py:票务 MCP 服务器,提供 train_tickets、flight_tickets 和 concert_tickets 表的 SELECT 查询接口,返回 JSON 格式结果。

核心功能

  • 初始化 MySQL 数据库连接。
  • 执行 SELECT 查询,返回 JSON 格式结果。
  • 格式化日期和数值字段,确保 JSON 序列化兼容。
  • 通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。

1 TicketService类

目标:提供票务数据查询服务,响应代理的 SQL 请求。

功能:初始化 MySQL 连接,执行 SELECT 查询,格式化结果为 JSON。

位置:SmartVoyage/mcp_server/mcp_ticket_server.py

import mysql.connector
import json
from datetime import date, datetime, timedelta
from decimal import Decimal
from mcp.server.fastmcp import FastMCP

from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger
from SmartVoyage.utils.format import DateEncoder, default_encoder

conf = Config()


# 票务服务类
class TicketService:  # 定义票务服务类,封装数据库操作逻辑
    def __init__(self):  # 初始化方法,建立数据库连接
        # 连接数据库
        self.conn = mysql.connector.connect(
            host=conf.host,
            user=conf.user,
            password=conf.password,
            database=conf.database
        )

    # 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串
    def execute_query(self, sql: str) -> str:
        try:
            cursor = self.conn.cursor(dictionary=True)
            cursor.execute(sql)
            results = cursor.fetchall()
            cursor.close()
            # 格式化结果
            for result in results:  # 遍历每个结果字典
                for key, value in result.items():
                    if isinstance(value, (date, datetime, timedelta, Decimal)):  # 检查值是否为特殊类型
                        result[key] = default_encoder(value)  # 使用自定义编码器格式化该值
            # 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义
            return json.dumps({"status": "success", "data": results} if results else {"status": "no_data",
                                                                                      "message": "未找到票务数据,请确认查询条件。"},
                              cls=DateEncoder, ensure_ascii=False)
        except Exception as e:
            logger.error(f"票务查询错误: {str(e)}")
            # 返回错误JSON响应
            return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)

测试

if __name__ == "__main__":
    service = TicketService()
    sql = "SELECT * FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '北京' AND DATE(departure_time) = '2025-10-28' AND cabin_type = '公务舱'"
    print(service.execute_query(sql))

2 启动MCP 服务器

create_ticket_mcp_server()函数

目标:创建并启动票务 MCP 服务器。

功能:初始化 FastMCP,注册 query_tickets 工具,启动 FastAPI 服务器,监听端口 6002。

# 创建票务MCP服务器
def create_ticket_mcp_server():
    # 创建FastMCP实例
    ticket_mcp = FastMCP(name="TicketTools",
                         instructions="票务查询工具,基于 train_tickets, flight_tickets, concert_tickets 表。只支持查询。",
                         log_level="ERROR",
                         host="127.0.0.1", port=8001)

    # 实例化票务服务对象
    service = TicketService()

    @ticket_mcp.tool(
        name="query_tickets",
        description="查询票务数据,输入 SQL,如 'SELECT * FROM train_tickets WHERE departure_city = \"北京\" AND arrival_city = \"上海\"'"
    )
    def query_tickets(sql: str) -> str:
        logger.info(f"执行票务查询: {sql}")
        return service.execute_query(sql)

    # 打印服务器信息
    logger.info("=== 票务MCP服务器信息 ===")
    logger.info(f"名称: {ticket_mcp.name}")
    logger.info(f"描述: {ticket_mcp.instructions}")

    # 运行服务器
    try:
        print("服务器已启动,请访问 http://127.0.0.1:8001/mcp")
        ticket_mcp.run(transport="streamable-http")  # 使用 streamable-http 传输方式
    except Exception as e:
        print(f"服务器启动失败: {e}")

客户端测试

位置:SmartVoyage/test/test_ticket_mcp_server.py

import asyncio
import json

from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

# 定义服务器地址
server_url = "http://127.0.0.1:8001/mcp"

async def test_ticket_mcp():
    try:
        # 启动 MCP server,通过streamable建立连接
        async with streamablehttp_client(server_url) as (read, write, _):
            # 使用读写通道创建 MCP 会话
            async with ClientSession(read, write) as session:
                try:
                    await session.initialize()
                    print("会话初始化成功,可以开始调用工具。")

                    # 从 session 自动获取 MCP server 提供的工具列表。
                    tools = await load_mcp_tools(session)
                    print(f"tools-->{tools}")

                    # 调用远程工具
                    # 测试1: 查询机票
                    sql_flights = "SELECT * FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '北京' AND DATE(departure_time) = '2025-10-28' AND cabin_type = '公务舱'"
                    result_flights = await session.call_tool("query_tickets", {"sql": sql_flights})
                    result_flights_data = json.loads(result_flights) if isinstance(result_flights, str) else result_flights
                    print(f"机票查询结果:{result_flights_data}")

                    # 测试2: 查询火车票
                    sql_trains = "SELECT * FROM train_tickets WHERE departure_city = '北京' AND arrival_city = '上海' AND DATE(departure_time) = '2025-10-22' AND seat_type = '二等座'"
                    result_trains = await session.call_tool("query_tickets", {"sql": sql_trains})
                    result_trains_data = json.loads(result_trains) if isinstance(result_trains, str) else result_trains
                    print(f"火车票查询结果:{result_trains_data}")

                    # 测试3: 查询演唱会票
                    sql_concerts = "SELECT * FROM concert_tickets WHERE city = '北京' AND artist = '刀郎' AND DATE(start_time) = '2025-10-31' AND ticket_type = '看台'"
                    result_concerts = await session.call_tool("query_tickets", {"sql": sql_concerts})
                    result_concerts_data = json.loads(result_concerts) if isinstance(result_concerts, str) else result_concerts
                    print(f"演唱会票查询结果:{result_concerts_data}")
                except Exception as e:
                    print(f"票务 MCP 测试出错:{str(e)}")
    except Exception as e:
        print(f"连接或会话初始化时发生错误: {e}")
        print("请确认服务端脚本已启动并运行在 http://127.0.0.1:8001/mcp")


if __name__ == "__main__":
    asyncio.run(test_ticket_mcp())

三、订票MCP服务器

mcp_order_server.py:订票 MCP 服务器,通过调用API完成火车票、飞机票和演唱会票的预定。

核心功能

  • 火车票预定、飞机票预定、演出票预定。
  • 通过 FastAPI 提供 HTTP 接口,响应 MCP 工具调用。

位置:SmartVoyage/mcp_server/mcp_order_server.py

from mcp.server.fastmcp import FastMCP

from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger

conf = Config()

# 创建FastMCP实例
order_mcp = FastMCP(name="OrderTools",
                    instructions="票务预定工具,通过调用API完成火车票、飞机票和演唱会票的预定。",
                    log_level="ERROR",
                    host="127.0.0.1", port=8003)


@order_mcp.tool(
    name="order_train",
    description="根据时间、车次、座位类型、数量预定火车票"
)
def order_train(departure_date: str, train_number: str, seat_type: str, number: int) -> str:
    '''
    Args:
        departure_date (str): 出发日期,如 '2025-10-30'
        train_number (str): 火车车次,如 'G346'
        seat_type (str): 座位类型,如 '二等座'
        number (int): 订购张数
    '''
    logger.info(f"正在订购火车票: {departure_date}, {train_number}, {seat_type}, {number}")
    logger.info(f"恭喜,火车票预定成功!")
    return "恭喜,火车票预定成功!"

@order_mcp.tool(
    name="order_flight",
    description="根据时间、班次、座位类型、数量预定飞机票"
)
def order_flight(departure_date: str, flight_number: str, seat_type: str, number: int) -> str:
    '''
    Args:
        departure_date (str): 出发日期,如 '2025-10-30'
        flight_number (str): 飞机班次,如 'CA6557'
        seat_type (str): 座位类型,如 '经济舱'
        number (int): 订购张数
    '''
    logger.info(f"正在订购飞机票: {departure_date}, {flight_number}, {seat_type}, {number}")
    logger.info(f"恭喜,飞机票预定成功!")
    return "恭喜,飞机票预定成功!"


@order_mcp.tool(
    name="order_concert",
    description="根据时间、明星、场地、座位类型、数量预定演出票"
)
def order_concert(start_date: str, aritist: str, venue: str, seat_type: str, number: int) -> str:
    '''
    Args:
        start_date (str): 开始日期,如 '2025-10-30'
        aritist (str): 明星,如 '刀郎'
        venue (str): 场地,如 '上海体育馆'
        seat_type (str): 座位类型,如 '看台'
        number (int): 订购张数
    '''
    logger.info(f"正在订购演出票: {start_date}, {aritist}, {venue}, {seat_type}, {number}")
    logger.info(f"恭喜,演出票预定成功!")
    return "恭喜,演出票预定成功!"


# 创建票务预定MCP服务器
def create_order_mcp_server():
    # 打印服务器信息
    logger.info("=== 票务预定MCP服务器信息 ===")
    logger.info(f"名称: {order_mcp.name}")
    logger.info(f"描述: {order_mcp.instructions}")

    # 运行服务器
    try:
        print("服务器已启动,请访问 http://127.0.0.1:8003/mcp")
        order_mcp.run(transport="streamable-http")  # 使用 streamable-http 传输方式
    except Exception as e:
        print(f"服务器启动失败: {e}")


if __name__ == "__main__":
    # 调用创建服务器函数
    create_order_mcp_server()

客户端测试

位置:SmartVoyage/test/test_order_mcp_server.py

import asyncio
import json

from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger

conf = Config()

# 初始化LLM
llm = ChatOpenAI(
    model=conf.model_name,
    base_url=conf.base_url,
    api_key=conf.api_key,
    temperature=0.1
)


async def order_tickets(query):
    try:
        # 启动 MCP server,通过streamable建立连接
        async with streamablehttp_client("http://127.0.0.1:8003/mcp") as (read, write, _):
            # 使用读写通道创建 MCP 会话
            async with ClientSession(read, write) as session:
                try:
                    await session.initialize()

                    # 从 session 自动获取 MCP server 提供的工具列表。
                    tools = await load_mcp_tools(session)
                    # print(f"tools-->{tools}")

                    # 创建 agent 的提示模板
                    prompt = ChatPromptTemplate.from_messages([
                        ("system",
                         "你是一个票务预定助手,能够调用工具来完成火车票、飞机票或演出票的预定。你需要仔细分析工具需要的参数,然后从用户提供的信息中提取信息。如果用户提供的信息不足以提取到调用工具所有必要参数,则向用户追问,以获取该信息。不能自己编撰参数。"),
                        ("human", "{input}"),
                        ("placeholder", "{agent_scratchpad}"),
                    ])

                    # 构建工具调用代理
                    agent = create_tool_calling_agent(llm, tools, prompt)

                    # 创建代理执行器
                    agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

                    # 代理调用
                    response = await agent_executor.ainvoke({"input": query})

                    return response['output']
                except Exception as e:
                    logger.info(f"票务 MCP 测试出错:{str(e)}")
                    return f"票务 MCP 查询出错:{str(e)}"
    except Exception as e:
        logger.error(f"连接或会话初始化时发生错误: {e}")
        return "连接或会话初始化时发生错误"


if __name__ == "__main__":
    while True:
        query = input("请输入查询:")
        if query == "exit":
            break
        print(asyncio.run(order_tickets(query)))

本节小结

本节主要描述了smartVoyage项目用到的所有MCP服务。