跳转至

3.6 Agent服务

学习目标

  • 掌握天气Agent服务器的用法
  • 掌握票务Agent服务器的用法
  • 掌握订票Agent服务器的用法

一、天气Agent服务器

weather_server.py:天气代理服务器,使用 LLM 生成 SQL 查询 MCP 票务工具,返回用户友好文本结果。

作用:处理用户自然语言查询,转为 SQL 调用 MCP,提升智能性,支持追问和默认值。

项目中的定位:执行层,接收路由任务,生成 SQL 调用 MCP,返回 artifacts 给客户端。

核心功能

  • 初始化 LLM 和 MCP 客户端。

  • 生成 SQL,提取代码块,调用 MCP。

  • 解析 JSON 结果,返回格式化文本。

1 导包与配置

位置:SmartVoyage/a2a_server/weather_server.py

import json
import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

from SmartVoyage.config import Config
from datetime import datetime
import pytz

from SmartVoyage.create_logger import logger

conf = Config()

# 初始化LLM
llm = ChatOpenAI(
    model=conf.model_name,
    base_url=conf.base_url,
    api_key=conf.api_key,
    temperature=0.1
)

# 数据表 schema
table_schema_string = """  # 定义天气数据表的SQL schema字符串,用于Prompt上下文
CREATE TABLE IF NOT EXISTS weather_data (
id INT AUTO_INCREMENT PRIMARY KEY,
city VARCHAR(50) NOT NULL COMMENT '城市名称',
fx_date DATE NOT NULL COMMENT '预报日期',
sunrise TIME COMMENT '日出时间',
sunset TIME COMMENT '日落时间',
moonrise TIME COMMENT '月升时间',
moonset TIME COMMENT '月落时间',
moon_phase VARCHAR(20) COMMENT '月相名称',
moon_phase_icon VARCHAR(10) COMMENT '月相图标代码',
temp_max INT COMMENT '最高温度',
temp_min INT COMMENT '最低温度',
icon_day VARCHAR(10) COMMENT '白天天气图标代码',
text_day VARCHAR(20) COMMENT '白天天气描述',
icon_night VARCHAR(10) COMMENT '夜间天气图标代码',
text_night VARCHAR(20) COMMENT '夜间天气描述',
wind360_day INT COMMENT '白天风向360角度',
wind_dir_day VARCHAR(20) COMMENT '白天风向',
wind_scale_day VARCHAR(10) COMMENT '白天风力等级',
wind_speed_day INT COMMENT '白天风速 (km/h)',
wind360_night INT COMMENT '夜间风向360角度',
wind_dir_night VARCHAR(20) COMMENT '夜间风向',
wind_scale_night VARCHAR(10) COMMENT '夜间风力等级',
wind_speed_night INT COMMENT '夜间风速 (km/h)',
precip DECIMAL(5,1) COMMENT '降水量 (mm)',
uv_index INT COMMENT '紫外线指数',
humidity INT COMMENT '相对湿度 (%)',
pressure INT COMMENT '大气压强 (hPa)',
vis INT COMMENT '能见度 (km)',
cloud INT COMMENT '云量 (%)',
update_time DATETIME COMMENT '数据更新时间',
UNIQUE KEY unique_city_date (city, fx_date)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='天气数据表';
"""

# 生成SQL的提示词
sql_prompt = ChatPromptTemplate.from_template(
    """
系统提示:你是一个专业的天气SQL生成器,需要从对话历史(含用户的问题)中提取关键信息,然后基于weather_data表生成SELECT语句。
- 如果用户需要查天气,则至少需要城市和时间信息。如果对话历史中缺乏必要的信息,可以向其追问,输出格式为json格式,如示例所示;如果对话历史中信息齐全,则输出纯SQL即可。
- 如果用户问与天气无关的问题,则模仿最后2个示例回复即可。


示例:
- 对话: user: 北京 2025-07-30
输出: SELECT city, fx_date, temp_max, temp_min, text_day, text_night, humidity, wind_dir_day, precip FROM weather_data WHERE city = '北京' AND fx_date = '2025-07-30'
- 对话: user: 上海未来3天的天气
输出: SELECT city, fx_date, temp_max, temp_min, text_day, text_night, humidity, wind_dir_day, precip FROM weather_data WHERE city = '上海' AND fx_date BETWEEN '2025-07-30' AND '2025-08-01' ORDER BY fx_date
- 对话: user: 北京的天气
输出: {{"status": "input_required", "message": "请提供具体的需要查询的日期,例如 '2025-07-30'。"}}
- 对话: user: 今天\nassistant: 请提供城市。\nuser: 北京
输出: SELECT city, fx_date, temp_max, temp_min, text_day, text_night, humidity, wind_dir_day, precip FROM weather_data WHERE city = '北京' AND fx_date = '2025-07-30'
- 对话: user: 北京明天的天气\nassistant: 多云。\nuser: 后天呢
输出: SELECT city, fx_date, temp_max, temp_min, text_day, text_night, humidity, wind_dir_day, precip FROM weather_data WHERE city = '北京' AND fx_date = '2025-08-01'
- 对话: user: 你好
输出: {{"status": "input_required", "message": "请提供城市和日期,例如 '北京 2025-07-30'。"}}
- 对话: user: 今天有什么好吃的
输出: {{"status": "input_required", "message": "请提供天气相关查询,包括城市和日期。"}}

weather_data表结构:{table_schema_string}
对话历史: {conversation}
当前日期: {current_date} (Asia/Shanghai)
    """
)

2 查询函数

# 定义查询函数
async def get_weather(sql):
    try:
        # 启动 MCP server,通过streamable建立连接
        async with streamablehttp_client("http://127.0.0.1:8002/mcp") as (read, write, _):
            # 使用读写通道创建 MCP 会话
            async with ClientSession(read, write) as session:
                try:
                    await session.initialize()
                    # 工具调用
                    result = await session.call_tool("query_weather", {"sql": sql})
                    result_data = json.loads(result) if isinstance(result, str) else result
                    logger.info(f"天气查询结果:{result_data}")
                    return result_data.content[0].text
                except Exception as e:
                    logger.error(f"天气 MCP 测试出错:{str(e)}")
                    return {"status": "error", "message": f"天气 MCP 查询出错:{str(e)}"}
    except Exception as e:
        logger.error(f"连接或会话初始化时发生错误: {e}")
        return {"status": "error", "message": "连接或会话初始化时发生错误"}

3 AgentCard定义

# Agent卡片定义
agent_card = AgentCard(
    name="WeatherQueryAssistant",
    description="基于LangChain提供天气查询服务的助手",
    url="http://localhost:5005",
    version="1.0.0",
    capabilities={"streaming": True, "memory": True},  # 设置能力:支持流式和内存
    skills=[  # 定义技能列表
        AgentSkill(
            name="execute weather query",
            description="执行天气查询,返回天气数据库结果,支持自然语言输入",
            examples=["北京 2025-07-30 天气", "上海未来5天", "今天天气如何"]
        )
    ]
)

4 WeatherQueryServer-基础

# 天气查询服务器类
class WeatherQueryServer(A2AServer):
    def __init__(self):
        super().__init__(agent_card=agent_card)
        self.llm = llm
        self.sql_prompt = sql_prompt
        self.schema = table_schema_string

    # 定义生成SQL查询方法,输入对话历史,返回SQL或追问JSON
    def generate_sql_query(self, conversation: str) -> dict:
        try:
            # 组装链
            chain = self.sql_prompt | self.llm
            # 调用链
            current_date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d')  # 获取当前日期,格式化为字符串
            output = chain.invoke({"conversation": conversation, "current_date": current_date, "table_schema_string": self.schema}).content.strip()
            logger.info(f"原始 LLM 输出: {output}")
            # 处理结果,返回字典
            if output.startswith('{'):  # 检查输出是否以JSON开头
                return json.loads(output)
            return {"status": "sql", "sql": output}
        except Exception as e:
            logger.error(f"SQL生成失败: {str(e)}")
            return {"status": "input_required", "message": "查询无效,请提供城市和日期。"}  # 返回追问JSON

5 WeatherQueryServer-处理

    # 处理任务:提取输入,生成SQL,调用MCP,格式化结果
    def handle_task(self, task):
        # 1 提取输入
        content = (task.message or {}).get("content", {})  # 从消息中获取内容
        # 提取conversation,即客户端发起的任务中的query语句
        conversation = content.get("text", "") if isinstance(content, dict) else ""
        logger.info(f"对话历史及用户问题: {conversation}")

        try:
            # 2 基于用户问题生成SQL查询
            gen_result = self.generate_sql_query(conversation)
            # 检查是否需要追问,如果是则添加追问消息后返回任务
            if gen_result["status"] == "input_required":
                # 追问逻辑,这里是指在无法正常生成sql时,设置任务状态为输入所需,添加追问消息
                task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
                                         message={"role": "agent", "content": {"text": gen_result["message"]}})
                return task

            # 否则则提取SQL查询,并进行MCP调用
            sql_query = gen_result["sql"]  #
            logger.info(f"生成的SQL查询: {sql_query}")

            # 3 调用MCP
            weather_result = asyncio.run(get_weather(sql_query))

            # 4 格式化结果
            response = json.loads(weather_result) if isinstance(weather_result, str) else weather_result
            logger.info(f"MCP 返回: {response}")
            # 检查响应状态
            if response.get("status") == "success":
                data = response.get("data", [])  # 提取数据列表
                response_text = "\n".join([f"{d['city']} {d['fx_date']}: {d['text_day']}(夜间 {d['text_night']}),温度 {d['temp_min']}-{d['temp_max']}°C,湿度 {d['humidity']}%,风向 {d['wind_dir_day']},降水 {d['precip']}mm" for d in data])  # 格式化每个数据项为友好文本,连接成多行

                # 设置任务产物为文本部分,并设置任务状态为完成
                task.artifacts = [{"parts": [{"type": "text", "text": response_text}]}]
                task.status = TaskStatus(state=TaskState.COMPLETED)
            elif response.get("status") == "no_data":
                response_text = response.get("message", "请重新输入查询的城市和日期。")

                # 设置任务状态为输入所需,添加追问消息
                task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
                                         message={"role": "agent", "content": {"text": response_text}})
            else:
                response_text = response.get("message", "查询失败,请重试或提供更多细节。")

                # 设置任务状态为失败,添加错误信息
                task.status = TaskStatus(state=TaskState.FAILED,
                                         message={"role": "agent", "content": {"text": response_text}})

            return task
        except Exception as e:  # 捕获异常
            logger.error(f"查询失败: {str(e)}")

            # 设置任务状态为失败,添加错误信息
            task.status = TaskStatus(state=TaskState.FAILED,
                                     message={"role": "agent",
                                              "content": {"text": f"查询失败: {str(e)} 请重试或提供更多细节。"}})
            return task

6 运行天气Agent服务器

if __name__ == "__main__":
    # 创建并运行服务器
    # 实例化天气查询服务器
    weather_server = WeatherQueryServer()
    # 打印服务器信息
    print("\n=== 服务器信息 ===")
    print(f"名称: {weather_server.agent_card.name}")
    print(f"描述: {weather_server.agent_card.description}")
    print("\n技能:")
    for skill in weather_server.agent_card.skills:
        print(f"- {skill.name}: {skill.description}")
    # 运行服务器
    run_server(weather_server, host="127.0.0.1", port=5005)

7 测试

位置:SmartVoyage/test/test_weather_agent_server.py

import asyncio
import uuid

from python_a2a import A2AClient, Message, TextContent, MessageRole, Task
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger

conf = Config()

# 初始化 LLM
llm = ChatOpenAI(
            model=conf.model_name,
            api_key=conf.api_key,
            base_url=conf.base_url,
            temperature=0.1
        )

# 天气总结提示模板
weather_prompt = ChatPromptTemplate.from_template(
    """
    您是一位天气解说员,以生动、引人入胜的风格为用户介绍天气查询结果。基于以下查询结果,生成一段总结:
    - 突出城市、日期、温度、天气描述和湿度,提及关键亮点如适宜活动。
    - 使用解说员的语气,例如“欢迎来到天气宝库!今天我们为您带来...”。
    - 如果结果为空,建议用户尝试其他查询条件。
    - 保持中文叙述,字数控制在 100-150 字。

    查询结果:
    {weather}

    总结:
    """
)

def main():
    # 初始化天气客户端
    weather_client = A2AClient("http://localhost:5005")

    # 获取天气代理信息
    try:
        logger.info("获取天气助手信息")
        logger.info(f"名称: {weather_client.agent_card.name}")
        logger.info(f"描述: {weather_client.agent_card.description}")
        logger.info(f"版本: {weather_client.agent_card.version}")
        if weather_client.agent_card.skills:
            logger.info("支持技能:")
            for skill in weather_client.agent_card.skills:
                logger.info(f"- {skill.name}: {skill.description}")
                if skill.examples:
                    logger.info(f"  示例: {', '.join(skill.examples)}")
    except Exception as e:
        logger.error(f"无法获取天气助手信息: {str(e)}")

    # 交互循环
    while True:
        user_input = input("输入您的天气查询(输入 'exit' 退出):")
        if user_input.lower() == 'exit':
            break

        try:
            query = user_input.strip()
            logger.info(f"用户查询 (天气): {query}")

            # 发送查询
            logger.info("正在查询数据...")
            message_weather = Message(content=TextContent(text=query), role=MessageRole.USER)
            task_weather = Task(id="task-" + str(uuid.uuid4()), message=message_weather.to_dict())

            weather_result_task = asyncio.run(weather_client.send_task_async(task_weather))
            logger.info(f"原始响应: {weather_result_task}")

            # 生成 LLM 总结
            if weather_result_task.status.state == 'completed':
                try:
                    summary_chain = weather_prompt | llm
                    weather_result = weather_result_task.artifacts[0]["parts"][0]["text"]
                    summary = summary_chain.invoke({"weather": weather_result}).content.strip()
                    logger.info(f"**天气解说员总结**:\n{summary}")
                except Exception as e:
                    error_message = f"生成总结失败: {str(e)}"
                    logger.error(error_message)
            else:
                logger.info(weather_result_task.status.message['content']['text'])
        except Exception as e:
            error_message = f"查询失败: {str(e)}"
            logger.error(error_message)


if __name__ == "__main__":
    print("天气agent server查询客户端测试脚本")
    main()

二、票务Agent服务器

ticket_server.py:票务代理服务器,使用 LLM 生成 SQL 查询 MCP 票务工具,返回用户友好文本结果。

作用:处理用户自然语言查询,转为 SQL 调用 MCP,提升智能性,支持追问和默认值。

项目中的定位:执行层,接收路由任务,生成 SQL 调用 MCP,返回 artifacts 给客户端。

核心功能

  • 初始化 LLM 和 MCP 客户端。
  • 生成 SQL,提取代码块,调用 MCP。
  • 解析 JSON 结果,返回格式化文本。

1 导包与配置

位置:SmartVoyage/a2a_server/ticket_server.py

import json
import asyncio

from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from datetime import datetime
import pytz

from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger

conf = Config()

# 初始化LLM
llm = ChatOpenAI(
    model=conf.model_name,
    base_url=conf.base_url,
    api_key=conf.api_key,
    temperature=0.1
)


# 数据表 schema
table_schema_string = """  # 定义票务表SQL schema字符串,用于Prompt上下文
CREATE TABLE train_tickets (
    id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
    departure_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '出发城市(如“北京”)',
    arrival_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '到达城市(如“上海”)',
    departure_time DATETIME NOT NULL COMMENT '出发时间(如“2025-08-12 07:00:00”)',
    arrival_time DATETIME NOT NULL COMMENT '到达时间(如“2025-08-12 11:30:00”)',
    train_number VARCHAR(20) NOT NULL COMMENT '火车车次(如“G1001”)',
    seat_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '座位类型(如“二等座”)',
    total_seats INT NOT NULL COMMENT '总座位数(如 1000)',
    remaining_seats INT NOT NULL COMMENT '剩余座位数(如 50)',
    price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 553.50)',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
    UNIQUE KEY unique_train (departure_time, train_number)
) COMMENT='火车票信息表';

-- 机票表
CREATE TABLE flight_tickets (
    id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
    departure_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '出发城市(如“北京”)',
    arrival_city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '到达城市(如“上海”)',
    departure_time DATETIME NOT NULL COMMENT '出发时间(如“2025-08-12 08:00:00”)',
    arrival_time DATETIME NOT NULL COMMENT '到达时间(如“2025-08-12 10:30:00”)',
    flight_number VARCHAR(20) NOT NULL COMMENT '航班号(如“CA1234”)',
    cabin_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '舱位类型(如“经济舱”)',
    total_seats INT NOT NULL COMMENT '总座位数(如 200)',
    remaining_seats INT NOT NULL COMMENT '剩余座位数(如 10)',
    price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 1200.00)',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
    UNIQUE KEY unique_flight (departure_time, flight_number)
) COMMENT='航班机票信息表';

-- 演唱会票表
CREATE TABLE concert_tickets (
    id INT AUTO_INCREMENT PRIMARY KEY COMMENT '主键,自增,唯一标识每条记录',
    artist VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '艺人名称(如“周杰伦”)',
    city VARCHAR(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '举办城市(如“上海”)',
    venue VARCHAR(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '场馆(如“上海体育场”)',
    start_time DATETIME NOT NULL COMMENT '开始时间(如“2025-08-12 19:00:00”)',
    end_time DATETIME NOT NULL COMMENT '结束时间(如“2025-08-12 22:00:00”)',
    ticket_type VARCHAR(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '票类型(如“VIP”)',
    total_seats INT NOT NULL COMMENT '总座位数(如 5000)',
    remaining_seats INT NOT NULL COMMENT '剩余座位数(如 100)',
    price DECIMAL(10, 2) NOT NULL COMMENT '票价(如 880.00)',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间,自动记录插入时间',
    UNIQUE KEY unique_concert (start_time, artist, ticket_type)
) COMMENT='演唱会门票信息表';
"""

# 生成SQL的提示词
sql_prompt = ChatPromptTemplate.from_template(
    """
系统提示:你是一个专业的票务SQL生成器,需要从对话历史(含用户的问题)中提取用户的意图以及关键信息,然后基于train_tickets、flight_tickets、concert_tickets表生成SELECT语句。
根据对话历史:
1. 提取用户的意图,意图有3种(train: 火车/高铁, flight: 机票, concert: 演唱会),输出:{{"type": "train/flight/concert"}};如果无法识别意图,或者意图不在这3种内,则模仿最后1个示例回复即可。
2. 根据用户的意图,生成对应表的 SELECT 语句,仅查询指定字段:
- train_tickets: id, departure_city, arrival_city, departure_time, arrival_time, train_number, seat_type, price, remaining_seats
- flight_tickets: id, departure_city, arrival_city, departure_time, arrival_time, flight_number, cabin_type, price, remaining_seats
- concert_tickets: id, artist, city, venue, start_time, end_time, ticket_type, price, remaining_seats
3. 如果用户在查询票务信息时,缺少必要信息,则输出:{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}} ,如示例所示;如果对话历史中信息齐全,则输出纯SQL即可。
其中,每种意图必要的信息有:
- flight/train: 【departure_city (出发城市), arrival_city (到达城市), date (日期)】 或 【train_number/flight_number (车次)】
- concert: city (城市), artist (艺人), date (日期)。
4. 按要求输出两行数据或一行数据即可,不需要输出其他内容。


示例:
- 对话: user: 火车票 北京 上海 2025-07-31 硬卧
输出: 
{{"type": "train"}}
SELECT id, departure_city, arrival_city, departure_time, arrival_time, train_number, seat_type, price, remaining_seats FROM train_tickets WHERE departure_city = '北京' AND arrival_city = '上海' AND DATE(departure_time) = '2025-07-31' AND seat_type = '硬卧'

- 对话: user: 机票 上海 广州 2025-09-11 头等舱
输出: 
{{"type": "flight"}}
SELECT id, departure_city, arrival_city, departure_time, arrival_time, flight_number, cabin_type, price, remaining_seats FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '广州' AND DATE(departure_time) = '2025-09-11' AND cabin_type = '头等舱'

- 对话: user: 演唱会 北京 刀郎 2025-08-23 看台
输出: 
{{"type": "concert"}}
SELECT id, artist, city, venue, start_time, end_time, ticket_type, price, remaining_seats FROM concert_tickets WHERE city = '北京' AND artist = '刀郎' AND DATE(start_time) = '2025-08-23' AND ticket_type = '看台'

- 对话: user: 火车票
输出: 
{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}}

- 对话: user: 你好
输出: 
{{"status": "input_required", "message": "请提供票务类型(如火车票、机票、演唱会)和必要信息(如城市、日期)。"}} 

表结构:{table_schema_string}
对话历史: {conversation}
当前日期: {current_date} (Asia/Shanghai)
    """
)

2 查询函数

# 定义查询函数
async def get_ticket_info(sql):
    try:
        # 启动 MCP server,通过streamable建立连接
        async with streamablehttp_client("http://127.0.0.1:8001/mcp") as (read, write, _):
            # 使用读写通道创建 MCP 会话
            async with ClientSession(read, write) as session:
                try:
                    await session.initialize()
                    # 工具调用
                    result = await session.call_tool("query_tickets", {"sql": sql})
                    result_data = json.loads(result) if isinstance(result, str) else result
                    logger.info(f"票务查询结果:{result_data}")
                    return result_data.content[0].text
                except Exception as e:
                    logger.error(f"票务 MCP 测试出错:{str(e)}")
                    return {"status": "error", "message": f"票务 MCP 查询出错:{str(e)}"}
    except Exception as e:
        logger.error(f"连接或会话初始化时发生错误: {e}")
        return {"status": "error", "message": "连接或会话初始化时发生错误"}

3 AgentCard定义

# Agent 卡片定义
agent_card = AgentCard(
    name="TicketQueryAssistant",
    description="基于 LangChain 提供票务查询服务的助手",
    url="http://localhost:5006",
    version="1.0.4",
    capabilities={"streaming": True, "memory": True},
    skills=[
        AgentSkill(
            name="execute ticket query",
            description="根据客户端提供的输入执行票务查询,返回数据库结果,支持自然语言输入",
            examples=["火车票 北京 上海 2025-07-31 硬卧", "机票 北京 上海 2025-07-31 经济舱",
                      "演唱会 北京 刀郎 2025-08-23 看台"]
        )
    ]
)

4 TicketQueryServer-基础

# 票务查询服务器类
class TicketQueryServer(A2AServer):
    def __init__(self):
        super().__init__(agent_card=agent_card)
        self.llm = llm
        self.sql_prompt = sql_prompt
        self.schema = table_schema_string

    # 定义生成SQL查询方法,输入对话历史,返回SQL或追问JSON
    def generate_sql_query(self, conversation: str) -> dict:
        try:
            # 组装链
            chain = self.sql_prompt | self.llm
            # 调用链
            current_date = datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d')  # 获取当前日期,格式化为字符串
            output = chain.invoke({"conversation": conversation, "current_date": current_date, "table_schema_string": self.schema}).content.strip()
            logger.info(f"原始 LLM 输出: {output}")

            # 处理结果,返回字典
            lines = output.split('\n')
            type_line = lines[0].strip()
            if type_line.startswith('```json'):  # 检查是否以```json开头
                type_line = lines[1].strip()  # 取下一行为类型行
                sql_lines = lines[3:-1] if lines[-1].strip() == '```' else lines[3:]  # 提取SQL行,跳过代码块标记
            else:
                sql_lines = lines[1:] if len(lines) > 1 else []  # 取剩余行为SQL行

            # 提取 type 和 SQL
            if type_line.startswith('{"type":'):  # 如果以{"type":开头
                query_type = json.loads(type_line)["type"]  # 解析并提取类型
                sql_query = ' '.join([line.strip() for line in sql_lines if line.strip() and not line.startswith('```')])  # 连接SQL行,过滤空行和代码块
                logger.info(f"分类类型: {query_type}, 生成的 SQL: {sql_query}")
                return {"status": "sql", "type": query_type, "sql": sql_query}  # 返回SQL状态字典,包括类型
            elif type_line.startswith('{"status": "input_required"'):  # 检查是否为追问JSON
                return json.loads(type_line)
            else:  # 无效格式
                logger.error(f"无效的 LLM 输出格式: {output}")
                return {"status": "input_required", "message": "无法解析查询类型或SQL,请提供更明确的信息。"}  # 返回默认追问
        except Exception as e:
            logger.error(f"SQL 生成失败: {str(e)}")
            return {"status": "input_required", "message": "查询无效,请提供查询票务的相关信息。"}  # 返回追问JSON

5 TicketQueryServer-处理

    # 处理任务:提取输入,生成SQL,调用MCP,格式化结果
    def handle_task(self, task):
        # 1 提取输入
        content = (task.message or {}).get("content", {})  # 从消息中获取内容
        # 提取conversation,即客户端发起的任务中的query语句
        conversation = content.get("text", "") if isinstance(content, dict) else ""
        logger.info(f"对话历史及用户问题: {conversation}")

        try:
            # 2 基于用户问题生成SQL查询
            gen_result = self.generate_sql_query(conversation)
            # 检查是否需要追问,如果是则添加追问消息后返回任务
            if gen_result["status"] == "input_required":
                task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
                                         message={"role": "agent", "content": {"text": gen_result["message"]}})
                return task

            # 否则则提取SQL查询,并进行MCP调用
            sql_query = gen_result["sql"]
            query_type = gen_result["type"]
            logger.info(f"执行 SQL 查询: {sql_query} (类型: {query_type})")

            # 3 调用MCP
            ticket_result = asyncio.run(get_ticket_info(sql_query))

            # 4 格式化结果
            response = json.loads(ticket_result) if isinstance(ticket_result, str) else ticket_result
            logger.info(f"MCP 返回: {response}")
            # 检查响应状态
            if response.get("status") == "success":
                data = response.get("data", [])  # 提取数据列表
                response_text = ""  # 初始化响应文本
                for d in data:  # 遍历每个数据项
                    if query_type == "train":  # 火车票类型
                        response_text += f"{d['departure_city']}{d['arrival_city']} {d['departure_time']}: 车次 {d['train_number']}{d['seat_type']},票价 {d['price']}元,剩余 {d['remaining_seats']}\n"  # 格式化火车票文本
                    elif query_type == "flight":  # 机票类型
                        response_text += f"{d['departure_city']}{d['arrival_city']} {d['departure_time']}: 航班 {d['flight_number']}{d['cabin_type']},票价 {d['price']}元,剩余 {d['remaining_seats']}\n"  # 格式化机票文本
                    elif query_type == "concert":  # 演唱会类型
                        response_text += f"{d['city']} {d['start_time']}: {d['artist']} 演唱会,{d['ticket_type']},场地 {d['venue']},票价 {d['price']}元,剩余 {d['remaining_seats']}\n"  # 格式化演唱会文本
                if not response_text:  # 检查文本是否为空
                    response_text = "无结果。如果需要其他日期,请补充。"

                # 设置任务产物为文本部分,并设置任务状态为完成
                task.artifacts = [{"parts": [{"type": "text", "text": response_text}]}]
                task.status = TaskStatus(state=TaskState.COMPLETED)
            elif response.get("status") == "no_data":
                response_text = response.get("message", "请输出查询票务的详细信息。")

                # 设置任务状态为输入所需,添加追问消息
                task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
                                         message={"role": "agent", "content": {"text": response_text}})
            else:
                response_text = response.get("message", "查询失败,请重试或提供更多细节。")

                # 设置任务状态为失败,添加错误信息
                task.status = TaskStatus(state=TaskState.FAILED,
                                         message={"role": "agent", "content": {"text": response_text}})
            return task
        except Exception as e:  # 捕获异常
            logger.error(f"查询失败: {str(e)}")

            # 设置任务状态为失败,添加错误信息
            task.status = TaskStatus(state=TaskState.FAILED,
                                     message={"role": "agent", "content": {"text": f"查询失败: {str(e)} 请重试或提供更多细节。"}})
            return task

6 运行票务Agent服务器

if __name__ == "__main__":
    # 创建并运行服务器
    # 实例化票务查询服务器
    ticket_server = TicketQueryServer()
    # 打印服务器信息
    print("\n=== 服务器信息 ===")
    print(f"名称: {ticket_server.agent_card.name}")
    print(f"描述: {ticket_server.agent_card.description}")
    print("\n技能:")
    for skill in ticket_server.agent_card.skills:
        print(f"- {skill.name}: {skill.description}")
    # 运行服务器
    run_server(ticket_server, host="127.0.0.1", port=5006)

7 测试

位置:SmartVoyage/test/test_ticket_agent_server.py

import asyncio
import uuid

from python_a2a import A2AClient, Message, TextContent, MessageRole, Task
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger

conf = Config()

# 初始化 LLM
llm = ChatOpenAI(
            model=conf.model_name,
            api_key=conf.api_key,
            base_url=conf.base_url,
            temperature=0.1
        )


# 票务总结提示模板
ticket_prompt = ChatPromptTemplate.from_template(
    """
    您是一位票务解说员,以生动、引人入胜的风格为用户介绍票务查询结果。基于以下查询结果,生成一段总结:
    - 突出出发/到达城市、时间、类型、价格和剩余票数,提及关键亮点。
    - 使用解说员的语气,例如“欢迎来到票务宝库!今天我们为您精选了...”。
    - 如果结果为空,建议用户尝试其他查询条件。
    - 保持中文叙述,字数控制在 100-150 字。

    查询结果:
    {tickets}

    总结:
    """
)

def main():
    # 初始化票务客户端
    ticket_client = A2AClient("http://localhost:5006")

    # 获取票务代理信息
    try:
        logger.info("获取票务助手信息")
        logger.info(f"名称: {ticket_client.agent_card.name}")
        logger.info(f"描述: {ticket_client.agent_card.description}")
        logger.info(f"版本: {ticket_client.agent_card.version}")
        if ticket_client.agent_card.skills:
            logger.info("支持技能:")
            for skill in ticket_client.agent_card.skills:
                logger.info(f"- {skill.name}: {skill.description}")
                if skill.examples:
                    logger.info(f"  示例: {', '.join(skill.examples)}")
    except Exception as e:
        logger.error(f"无法获取票务助手信息: {str(e)}")

    # 交互循环
    while True:
        user_input = input("输入您的票务查询(输入 'exit' 退出):")
        if user_input.lower() == 'exit':
            break

        try:
            query = user_input.strip()
            logger.info(f"用户查询 (票务): {query}")

            # 发送查询
            logger.info("正在查询数据...")
            message_ticket = Message(content=TextContent(text=query), role=MessageRole.USER)
            task_ticket = Task(id="task-" + str(uuid.uuid4()), message=message_ticket.to_dict())

            # 发送任务并获取最终结果
            ticket_result_task = asyncio.run(ticket_client.send_task_async(task_ticket))
            logger.info(f"原始响应: {ticket_result_task}")

            # 生成 LLM 总结
            if ticket_result_task.status.state == 'completed':
                try:
                    summary_chain = ticket_prompt | llm
                    ticket_result = ticket_result_task.artifacts[0]["parts"][0]["text"]
                    summary = summary_chain.invoke({"tickets": ticket_result}).content.strip()
                    logger.info(f"**票务解说员总结**:\n{summary}")
                except Exception as e:
                    error_message = f"生成总结失败: {str(e)}"
                    logger.error(error_message)
            else:
                logger.info(ticket_result_task.status.message['content']['text'])
        except Exception as e:
            error_message = f"查询失败: {str(e)}"
            logger.error(error_message)


if __name__ == "__main__":
    print("票务agent server查询客户端测试脚本")
    main()

三、订票Agent服务器

order_server.py:订票代理服务器,首先根据用户的意图去调用票务Agent服务器查询余票信息,然后进行调用订票MCP服务器完成订票。

作用:对用户的订票需求进行分析,先调用票务Agent服务器查询余票信息,如果有余票则完成订票,否则让用户修改需求。

项目中的定位:执行层,接收路由任务,查询余票并完成订票。

核心功能

  • 根据用户的意图去调用票务Agent服务器查询余票信息。
  • 根据余票信息调用订票MCP服务器完成订票。

1 导包与配置

位置:SmartVoyage/a2a_server/order_server.py

import asyncio
import uuid

from langchain_openai import ChatOpenAI
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from python_a2a import AgentCard, AgentSkill, run_server, TaskStatus, TaskState, A2AServer, A2AClient, Message, \
    TextContent, MessageRole, Task

from SmartVoyage.create_logger import logger
from agent_learn.config import Config

conf = Config()

# 初始化LLM
llm = ChatOpenAI(
    model=conf.model_name,
    base_url=conf.base_url,
    api_key=conf.api_key,
    temperature=0.1
)

2 订票函数

# 定义订票函数
async def order_tickets(query):
    try:
        # 启动 MCP server,通过streamable建立连接
        async with streamablehttp_client("http://127.0.0.1:8003/mcp") as (read, write, _):
            # 使用读写通道创建 MCP 会话
            async with ClientSession(read, write) as session:
                try:
                    await session.initialize()

                    # 从 session 自动获取 MCP server 提供的工具列表。
                    tools = await load_mcp_tools(session)
                    # print(f"tools-->{tools}")

                    # 创建 agent 的提示模板
                    prompt = ChatPromptTemplate.from_messages([
                        ("system",
                         "你是一个票务预定助手,能够调用工具来完成火车票、飞机票或演出票的预定。你需要仔细分析工具需要的参数,然后从用户提供的信息中提取信息。如果用户提供的信息不足以提取到调用工具所有必要参数,则向用户追问,以获取该信息。不能自己编撰参数。"),
                        ("human", "{input}"),
                        ("placeholder", "{agent_scratchpad}"),
                    ])

                    # 构建工具调用代理
                    agent = create_tool_calling_agent(llm, tools, prompt)

                    # 创建代理执行器
                    agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

                    # 代理调用
                    response = await agent_executor.ainvoke({"input": query})

                    return {"status": "success", "message": f"{response['output']}"}
                except Exception as e:
                    logger.error(f"票务 MCP 测试出错:{str(e)}")
                    return {"status": "error", "message": f"票务 MCP 查询出错:{str(e)}"}
    except Exception as e:
        logger.error(f"连接或会话初始化时发生错误: {e}")
        return {"status": "error", "message": "连接或会话初始化时发生错误"}

3 AgentCard定义

# Agent 卡片定义
agent_card = AgentCard(
    name="TicketOrderAssistant",
    description="通过MCP提供票务预定服务的助手",
    url="http://localhost:5007",
    version="1.0.4",
    capabilities={"streaming": True, "memory": True},
    skills=[
        AgentSkill(
            name="execute ticket order",
            description="根据客户端提供的输入执行票务预定,返回执行结果",
            examples=["北京 到 上海 2025-11-15 火车票 二等座 1张",
                      "上海 到 北京 2025-12-11 飞机票 公务舱 2张"]
        )
    ]
)

4 TicketOrderServer实现

# 票务预定服务器类
class TicketOrderServer(A2AServer):
    def __init__(self):
        super().__init__(agent_card=agent_card)
        self.llm = llm
        self.ticket_client = A2AClient("http://localhost:5006")

    # 处理任务:提取输入,查询余票,调用MCP,结果输出
    def handle_task(self, task):
        # 1 提取输入
        content = (task.message or {}).get("content", {})  # 从消息中获取内容
        # 提取conversation,即客户端发起的任务中的query语句
        conversation = content.get("text", "") if isinstance(content, dict) else ""
        logger.info(f"对话历史及用户问题: {conversation}")

        try:
            # 2 调用票务查询agent查询余票
            message_ticket = Message(content=TextContent(text=conversation), role=MessageRole.USER)
            task_ticket = Task(id="task-" + str(uuid.uuid4()), message=message_ticket.to_dict())

            # 发送任务并获取最终结果
            ticket_result_task = asyncio.run(self.ticket_client.send_task_async(task_ticket))
            logger.info(f"原始响应: {ticket_result_task}")

            # 处理结果:未查到余票信息时,则返回提示信息
            if ticket_result_task.status.state != 'completed':
                required_message = ticket_result_task.status.message['content']['text']
                logger.info(f'余票未查到:{required_message}')
                task.status = TaskStatus(state=TaskState.INPUT_REQUIRED,
                                         message={"role": "agent", "content": {"text": required_message}})
                return task
            # 处理结果:查到余票信息时,进行订票
            ticket_result = ticket_result_task.artifacts[0]["parts"][0]["text"]
            logger.info(f"余票信息: {ticket_result}")

            # 3 调用MCP订票
            order_result = asyncio.run(order_tickets(conversation + '\n余票信息:' + ticket_result))
            logger.info(f"MCP 返回: {order_result}")

            # 4 结果输出
            data = order_result.get("message", '')
            logger.info(f"订票结果: {data}")
            # 检查响应状态
            if order_result.get("status") == "success":
                result = '余票信息:' + ticket_result + '\n订票结果:' + data
                # 设置任务产物为文本部分,并设置任务状态为完成
                task.artifacts = [{"parts": [{"type": "text", "text": result}]}]
                task.status = TaskStatus(state=TaskState.COMPLETED)
            else:
                # 设置任务状态为失败,添加错误信息
                task.status = TaskStatus(state=TaskState.FAILED,
                                         message={"role": "agent", "content": {"text": data}})
            return task
        except Exception as e:  # 捕获异常
            logger.error(f"查询失败: {str(e)}")

            # 设置任务状态为失败,添加错误信息
            task.status = TaskStatus(state=TaskState.FAILED,
                                     message={"role": "agent", "content": {"text": f"查询失败: {str(e)} 请重试或提供更多细节。"}})
            return task

5 运行订票Agent服务器

if __name__ == "__main__":
    # 创建并运行服务器
    # 实例化票务查询服务器
    ticket_server = TicketOrderServer()
    # 打印服务器信息
    print("\n=== 服务器信息 ===")
    print(f"名称: {ticket_server.agent_card.name}")
    print(f"描述: {ticket_server.agent_card.description}")
    print("\n技能:")
    for skill in ticket_server.agent_card.skills:
        print(f"- {skill.name}: {skill.description}")
    # 运行服务器
    run_server(ticket_server, host="127.0.0.1", port=5007)

6 测试

位置:SmartVoyage/test/test_order_agent_server.py

import asyncio
import uuid

from python_a2a import A2AClient, Message, TextContent, MessageRole, Task

from SmartVoyage.config import Config
from SmartVoyage.create_logger import logger

conf = Config()


def main():
    # 初始化票务客户端
    ticket_client = A2AClient("http://localhost:5007")

    # 获取票务代理信息
    try:
        logger.info("获取票务预定助手信息")
        logger.info(f"名称: {ticket_client.agent_card.name}")
        logger.info(f"描述: {ticket_client.agent_card.description}")
        logger.info(f"版本: {ticket_client.agent_card.version}")
        if ticket_client.agent_card.skills:
            logger.info("支持技能:")
            for skill in ticket_client.agent_card.skills:
                logger.info(f"- {skill.name}: {skill.description}")
                if skill.examples:
                    logger.info(f"  示例: {', '.join(skill.examples)}")
    except Exception as e:
        logger.error(f"无法获取票务助手信息: {str(e)}")

    # 交互循环
    while True:
        user_input = input("输入您的票务预定需求(输入 'exit' 退出):")
        if user_input.lower() == 'exit':
            break

        try:
            query = user_input.strip()
            logger.info(f"用户查询 (票务): {query}")

            # 发送查询
            logger.info("正在查询数据...")
            message_ticket = Message(content=TextContent(text=query), role=MessageRole.USER)
            task_ticket = Task(id="task-" + str(uuid.uuid4()), message=message_ticket.to_dict())

            # 发送任务并获取最终结果
            ticket_result_task = asyncio.run(ticket_client.send_task_async(task_ticket))
            logger.info(f"原始响应: {ticket_result_task}")

            # 打印输出
            if ticket_result_task.status.state == 'completed':
                print(ticket_result_task.artifacts[0]["parts"][0]["text"])
            else:
                print(ticket_result_task.status.message['content']['text'])
        except Exception as e:
            error_message = f"查询失败: {str(e)}"
            logger.error(error_message)


if __name__ == "__main__":
    print("票务预定agent server查询客户端测试脚本")
    main()

本节小结

本节主要描述了smartVoyage使用到的所有Agent。