跳转至

3.4 天气数据的采集和更新

学习目标

  • 掌握采集天气数据的方法
  • 掌握爬取数据和定时更新到数据库的方法

一、数据采集

spider_weather天气数据采集脚本,从 API 获取数据,写入 MySQL。保持数据库实时更新,支持代理查询。数据采集地址:https://dev.qweather.com/docs/api/weather/weather-daily-forecast/

项目中的定位:后台数据源,定时执行。

核心功能:API 请求、数据解析、写入/更新数据库、调度。

1 API Host申请

API host介绍 https://dev.qweather.com/docs/configuration/api-config/
API host查看 https://console.qweather.com/setting?lang=zh

2 API Key申请

API Key介绍 https://dev.qweather.com/docs/configuration/authentication/#api-key
API Key查看 https://console.qweather.com/project?lang=zh

3 测试验证

ubuntu系统:

curl --compressed \
  -H "X-QW-Api-Key: 9ef68fe55401485180dd968fac902300" \
  "https://m7487r6ych.re.qweatherapi.com/v7/weather/3d?location=101010100"

windows系统:

curl --compressed -H "X-QW-Api-Key: 9ef68fe55401485180dd968fac902300" "https://m7487r6ych.re.qweatherapi.com/v7/weather/30d?location=101010100"

python代码:

import requests
import gzip
import json

# 配置(使用自己的密钥)
API_KEY = "9ef68fe55401485180dd968fac902300"
url = "https://m7487r6ych.re.qweatherapi.com/v7/weather/30d?location=101010100"  # 北京30天预报
headers = {
    "X-QW-Api-Key": API_KEY,
    "Accept-Encoding": "gzip"  # 请求gzip,但不强制
}
try:
    print("正在请求API...")
    response = requests.get(url, headers=headers, timeout=10)
    data = response.text
    parsed_data = json.loads(data)
    print("直接解析成功!")
    print(parsed_data)
except requests.RequestException as e:
    print(f"直接解析失败哦: {e}")

TIPS:

数据传输格式:gzip

适用条件:数据是可压缩的(如 JSON、HTML、文本),网络带宽有限,或高并发场景。默认情况下,如果不设置 Accept-Encoding,服务器通常不压缩。

好处:降低延迟、节省带宽、提升用户体验。

官网介绍:https://dev.qweather.com/docs/best-practices/gzip/

二、数据定时更新

1 导包及配置

以下配置是关于天气API配置以及数据库配置,通过爬虫爬取天气信息网站存储到数据库用于作为A2A检索数据库。

位置:SmartVoyage/utils/spider_weather.py

import requests
import mysql.connector
from datetime import datetime, timedelta
import schedule
import time
import json
import gzip
import pytz
from SmartVoyage.config import Config

conf = Config()

# 配置
API_KEY = "5ef0a47e161a4ea997227322317eae83"
city_codes = {
    "北京": "101010100",
    "上海": "101020100",
    "广州": "101280101",
    "深圳": "101280601"
}
BASE_URL = "https://m7487r6ych.re.qweatherapi.com/v7/weather/30d"
TZ = pytz.timezone('Asia/Shanghai')  # 使用上海时区

# MySQL 配置
db_config = {
    "host": conf.host,
    "user": conf.user,
    "password": conf.password,
    "database": conf.database,
    "charset": "utf8mb4"
}

2 连接数据库

connect_db函数

目标:建立 MySQL 数据库连接。

功能:使用 db_config 配置连接 MySQL,返回连接对象

输入输出

输入:无。

输出:mysql.connector.connection.MySQLConnection 对象。

def connect_db():
    return mysql.connector.connect(**db_config)

测试:

if __name__ == '__main__':
    conn = connect_db()
    print(conn.is_connected())
    print("数据库连接成功!")
    conn.close()

3 爬取数据

fetch_weather_data函数用于天气数据的爬取与解析。

目标:从和风天气 API 获取 30 天天气预报数据。

功能:发送 GET 请求,处理 gzip 压缩,解析 JSON 返回数据。

输入输出

输入:city(字符串,如“北京”),location(字符串,如“101010100”)。

输出:JSON 字典(包含 daily 预报列表)或 None。

def fetch_weather_data(city, location):
    headers = {
        "X-QW-Api-Key": API_KEY,
        "Accept-Encoding": "gzip"
    }
    url = f"{BASE_URL}?location={location}"
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        if response.headers.get('Content-Encoding') == 'gzip':
            data = gzip.decompress(response.content).decode('utf-8')
        else:
            data = response.text
        return json.loads(data)
    except requests.RequestException as e:
        print(f"请求 {city} 天气数据失败: {e}")
        return None
    except json.JSONDecodeError as e:
        print(f"{city} JSON 解析错误: {e}, 响应内容: {response.text[:500]}...")
        return None
    except gzip.BadGzipFile:
        print(f"{city} 数据未正确解压,尝试直接解析: {response.text[:500]}...")
        return json.loads(response.text) if response.text else None

测试:

if __name__ == "__main__":
    weather_data = fetch_weather_data("北京", city_codes["北京"])
    print(weather_data)
    print("解析成功!")

4 查询数据更新时间

get_latest_update_time函数

目标:查询数据库中指定城市的最新更新时间。

功能:执行 SQL 查询,返回 weather_data 表中 city 的最新 update_time。

输入输出

输入:cursor(MySQL 游标),city(字符串,如“北京”)。

输出:datetime 对象或 None。

def get_latest_update_time(cursor, city):
    cursor.execute("SELECT MAX(update_time) FROM weather_data WHERE city = %s", (city,))
    result = cursor.fetchone()
    return result[0] if result[0] else None

测试:

if __name__ == "__main__":
    # 建立数据库连接
    conn = connect_db()
    cursor = conn.cursor()

    # 获取北京城市的最新更新的时间日期
    print(get_latest_update_time(cursor, '北京'))

    # 关闭数据库连接
    cursor.close()
    conn.close()

5 是否需要更新

should_update_data函数

目标:判断是否需要更新城市天气数据。

功能:检查最新更新时间是否超过 1 天,或强制更新。

输入输出

输入:latest_time(datetime 或 None),force_update(布尔值)。

输出:布尔值(True/False)。

def should_update_data(latest_time, force_update=False):
    if force_update:
        return True
    if latest_time is None:
        return True

    # 时区问题:确保 latest_time 有时区信息
    if latest_time and latest_time.tzinfo is None:
        latest_time = latest_time.replace(tzinfo=TZ)

    current_time = datetime.now(TZ)
    return (current_time - latest_time) > timedelta(days=1)

测试:

if __name__ == "__main__":
    from datetime import datetime, timedelta
    import pytz

    # 设置时区
    TZ = pytz.timezone('Asia/Shanghai')

    # 模拟一个2天前的更新时间
    latest = datetime.now(TZ) - timedelta(days=2)
    print("========模拟一个两天前的时间==============")
    print(latest)
    # 测试是否需要更新数据
    print(should_update_data(latest))

    # 根据更新判断结果输出相应信息
    if should_update_data(latest):
        print(f"需要更新数据,上次更新时间:{latest}")
    else:
        print("没有数据,需要更新数据!")

6 存储数据

store_weather_data函数

目标:写入或更新天气预报数据到数据库。

功能:循环预报数据,使用 INSERT ON DUPLICATE KEY UPDATE 插入/更新 weather_data 表。

输入输出

输入:数据库连接、mysql游标,城市、数据。

输出:无,数据库更新。

def store_weather_data(conn, cursor, city, data):
    if not data or data.get("code") != "200":
        print(f"{city} 数据无效,跳过存储。")
        return

    daily_data = data.get("daily", [])
    update_time = datetime.fromisoformat(data.get("updateTime").replace("+08:00", "+08:00")).replace(tzinfo=TZ)

    for day in daily_data:
        fx_date = datetime.strptime(day["fxDate"], "%Y-%m-%d").date()
        values = (
            city, fx_date,
            day.get("sunrise"), day.get("sunset"),
            day.get("moonrise"), day.get("moonset"),
            day.get("moonPhase"), day.get("moonPhaseIcon"),
            day.get("tempMax"), day.get("tempMin"),
            day.get("iconDay"), day.get("textDay"),
            day.get("iconNight"), day.get("textNight"),
            day.get("wind360Day"), day.get("windDirDay"), day.get("windScaleDay"), day.get("windSpeedDay"),
            day.get("wind360Night"), day.get("windDirNight"), day.get("windScaleNight"), day.get("windSpeedNight"),
            day.get("precip"), day.get("uvIndex"),
            day.get("humidity"), day.get("pressure"),
            day.get("vis"), day.get("cloud"),
            update_time
        )
        insert_query = """
        INSERT INTO weather_data (
            city, fx_date, sunrise, sunset, moonrise, moonset, moon_phase, moon_phase_icon,
            temp_max, temp_min, icon_day, text_day, icon_night, text_night,
            wind360_day, wind_dir_day, wind_scale_day, wind_speed_day,
            wind360_night, wind_dir_night, wind_scale_night, wind_speed_night,
            precip, uv_index, humidity, pressure, vis, cloud, update_time
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            sunrise = VALUES(sunrise), sunset = VALUES(sunset), moonrise = VALUES(moonrise),
            moonset = VALUES(moonset), moon_phase = VALUES(moon_phase), moon_phase_icon = VALUES(moon_phase_icon),
            temp_max = VALUES(temp_max), temp_min = VALUES(temp_min), icon_day = VALUES(icon_day),
            text_day = VALUES(text_day), icon_night = VALUES(icon_night), text_night = VALUES(text_night),
            wind360_day = VALUES(wind360_day), wind_dir_day = VALUES(wind_dir_day), wind_scale_day = VALUES(wind_scale_day),
            wind_speed_day = VALUES(wind_speed_day), wind360_night = VALUES(wind360_night),
            wind_dir_night = VALUES(wind_dir_night), wind_scale_night = VALUES(wind_scale_night),
            wind_speed_night = VALUES(wind_speed_night), precip = VALUES(precip), uv_index = VALUES(uv_index),
            humidity = VALUES(humidity), pressure = VALUES(pressure), vis = VALUES(vis),
            cloud = VALUES(cloud), update_time = VALUES(update_time)
        """
        try:
            cursor.execute(insert_query, values)
            print(f"{city} {fx_date} 数据写入/更新成功: {day.get('textDay')}, 影响行数: {cursor.rowcount}")
            conn.commit()
            print(f"{city} 事务提交完成。")
        except mysql.connector.Error as e:
            print(f"{city} {fx_date} 数据库错误: {e}")
            conn.rollback()
            print(f"{city} 事务回滚。")

测试:

if __name__ == "__main__":
    conn = connect_db()
    cursor = conn.cursor()
    data = fetch_weather_data("北京", "101010100")
    store_weather_data(conn, cursor, "北京", data)
    print("数据存储完成。")

7 更新数据

update_weather函数

目标:更新所有城市数据。

功能:查看是否满足更新条件,调用数据存储与数据爬取。

输入输出:

输入:更新条件

输出:无,数据库更新。

def update_weather(force_update=False):
    conn = connect_db()
    cursor = conn.cursor()

    for city, location in city_codes.items():
        latest_time = get_latest_update_time(cursor, city)
        if should_update_data(latest_time, force_update):
            print(f"开始更新 {city} 天气数据...")
            data = fetch_weather_data(city, location)
            if data:
                store_weather_data(conn, cursor, city, data)
        else:
            print(f"{city} 数据已为最新,无需更新。最新更新时间: {latest_time}")

    cursor.close()
    conn.close()

测试:

if __name__ == "__main__":
    update_weather(force_update=True)

8 定时更新

setup_scheduler函数

目标:设置定时任务,每天在 PDT 16:00(北京时间 01:00)调用 update_weather 函数。保证数据的实时性。

功能

使用 schedule 库注册每日任务。

进入无限循环,检查并运行待执行任务,每 60 秒检查一次。

项目中的定位:确保天气数据定时更新,保持 weather_data 表的数据新鲜,支持 weather_server.py 和 mcp_weather_server.py 查询。

def setup_scheduler():
    # 北京时间 1:00 对应 PDT 前一天的 16:00(夏令时)
    schedule.every().day.at("16:00").do(update_weather)
    while True:
        schedule.run_pending()
        time.sleep(60)

原理测试:

from datetime import datetime, timedelta
import time
import schedule

now = datetime.now()
trigger_time = (now + timedelta(seconds=20)).strftime("%H:%M:%S")

print(f"[测试日志] 当前时间: {now}")
print(f"[测试日志] 设置任务在 {trigger_time} 触发 update_weather")

# 使用 lambda 延迟执行
schedule.every().day.at(trigger_time).do(lambda: print("任务已触发!"))

# 运行 30 秒以观察任务触发
end_time = now + timedelta(seconds=60)
while datetime.now() < end_time:
    schedule.run_pending()
    print(f"[测试日志] 检查待执行任务: {datetime.now()}")
    time.sleep(1)

9 完整代码

位置:SmartVoyage/utils/spider_weather.py

import requests
import mysql.connector
from datetime import datetime, timedelta
import schedule
import time
import json
import gzip
import pytz

# 配置
API_KEY = "5ef0a47e161a4ea997227322317eae83"
city_codes = {
    "北京": "101010100",
    "上海": "101020100",
    "广州": "101280101",
    "深圳": "101280601"
}
BASE_URL = "https://m7487r6ych.re.qweatherapi.com/v7/weather/30d"
TZ = pytz.timezone('Asia/Shanghai')  # 使用上海时区

# MySQL 配置
db_config = {
    "host": "localhost",
    "user": "root",
    "password": "123456",
    "database": "travel_rag",
    "charset": "utf8mb4"
}

def connect_db():
    return mysql.connector.connect(**db_config)

def fetch_weather_data(city, location):
    headers = {
        "X-QW-Api-Key": API_KEY,
        "Accept-Encoding": "gzip"
    }
    url = f"{BASE_URL}?location={location}"
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        if response.headers.get('Content-Encoding') == 'gzip':
            data = gzip.decompress(response.content).decode('utf-8')
        else:
            data = response.text
        return json.loads(data)
    except requests.RequestException as e:
        print(f"请求 {city} 天气数据失败: {e}")
        return None
    except json.JSONDecodeError as e:
        print(f"{city} JSON 解析错误: {e}, 响应内容: {response.text[:500]}...")
        return None
    except gzip.BadGzipFile:
        print(f"{city} 数据未正确解压,尝试直接解析: {response.text[:500]}...")
        return json.loads(response.text) if response.text else None

def get_latest_update_time(cursor, city):
    cursor.execute("SELECT MAX(update_time) FROM weather_data WHERE city = %s", (city,))
    result = cursor.fetchone()
    return result[0] if result[0] else None

def should_update_data(latest_time, force_update=False):
    if force_update:
        return True
    if not latest_time:
        return True
    current_time = datetime.now(TZ)
    latest_time = latest_time.replace(tzinfo=TZ)
    return (current_time - latest_time).total_seconds() / 3600 >= 24

def store_weather_data(conn, cursor, city, data):
    if not data or data.get("code") != "200":
        print(f"{city} 数据无效,跳过存储。")
        return

    daily_data = data.get("daily", [])
    update_time = datetime.fromisoformat(data.get("updateTime").replace("+08:00", "+08:00")).replace(tzinfo=TZ)

    for day in daily_data:
        fx_date = datetime.strptime(day["fxDate"], "%Y-%m-%d").date()
        values = (
            city, fx_date,
            day.get("sunrise"), day.get("sunset"),
            day.get("moonrise"), day.get("moonset"),
            day.get("moonPhase"), day.get("moonPhaseIcon"),
            day.get("tempMax"), day.get("tempMin"),
            day.get("iconDay"), day.get("textDay"),
            day.get("iconNight"), day.get("textNight"),
            day.get("wind360Day"), day.get("windDirDay"), day.get("windScaleDay"), day.get("windSpeedDay"),
            day.get("wind360Night"), day.get("windDirNight"), day.get("windScaleNight"), day.get("windSpeedNight"),
            day.get("precip"), day.get("uvIndex"),
            day.get("humidity"), day.get("pressure"),
            day.get("vis"), day.get("cloud"),
            update_time
        )
        insert_query = """
        INSERT INTO weather_data (
            city, fx_date, sunrise, sunset, moonrise, moonset, moon_phase, moon_phase_icon,
            temp_max, temp_min, icon_day, text_day, icon_night, text_night,
            wind360_day, wind_dir_day, wind_scale_day, wind_speed_day,
            wind360_night, wind_dir_night, wind_scale_night, wind_speed_night,
            precip, uv_index, humidity, pressure, vis, cloud, update_time
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            sunrise = VALUES(sunrise), sunset = VALUES(sunset), moonrise = VALUES(moonrise),
            moonset = VALUES(moonset), moon_phase = VALUES(moon_phase), moon_phase_icon = VALUES(moon_phase_icon),
            temp_max = VALUES(temp_max), temp_min = VALUES(temp_min), icon_day = VALUES(icon_day),
            text_day = VALUES(text_day), icon_night = VALUES(icon_night), text_night = VALUES(text_night),
            wind360_day = VALUES(wind360_day), wind_dir_day = VALUES(wind_dir_day), wind_scale_day = VALUES(wind_scale_day),
            wind_speed_day = VALUES(wind_speed_day), wind360_night = VALUES(wind360_night),
            wind_dir_night = VALUES(wind_dir_night), wind_scale_night = VALUES(wind_scale_night),
            wind_speed_night = VALUES(wind_speed_night), precip = VALUES(precip), uv_index = VALUES(uv_index),
            humidity = VALUES(humidity), pressure = VALUES(pressure), vis = VALUES(vis),
            cloud = VALUES(cloud), update_time = VALUES(update_time)
        """
        try:
            cursor.execute(insert_query, values)
            print(f"{city} {fx_date} 数据写入/更新成功: {day.get('textDay')}, 影响行数: {cursor.rowcount}")
            conn.commit()
            print(f"{city} 事务提交完成。")
        except mysql.connector.Error as e:
            print(f"{city} {fx_date} 数据库错误: {e}")
            conn.rollback()
            print(f"{city} 事务回滚。")

def update_weather(force_update=False):
    conn = connect_db()
    cursor = conn.cursor()

    for city, location in city_codes.items():
        latest_time = get_latest_update_time(cursor, city)
        if should_update_data(latest_time, force_update):
            print(f"开始更新 {city} 天气数据...")
            data = fetch_weather_data(city, location)
            if data:
                store_weather_data(conn, cursor, city, data)
        else:
            print(f"{city} 数据已为最新,无需更新。最新更新时间: {latest_time}")

    cursor.close()
    conn.close()

def setup_scheduler():
    # 北京时间 1:00 对应 PDT 前一天的 16:00(夏令时)
    schedule.every().day.at("16:00").do(update_weather)
    while True:
        schedule.run_pending()
        time.sleep(60)

if __name__ == "__main__":
    # 初始检查和更新
    with mysql.connector.connect(**db_config) as conn:
        cursor = conn.cursor()
        cursor.execute("""
        CREATE TABLE IF NOT EXISTS weather_data (
            id INT AUTO_INCREMENT PRIMARY KEY,
            city VARCHAR(50) NOT NULL COMMENT '城市名称',
            fx_date DATE NOT NULL COMMENT '预报日期',
            sunrise TIME COMMENT '日出时间',
            sunset TIME COMMENT '日落时间',
            moonrise TIME COMMENT '月升时间',
            moonset TIME COMMENT '月落时间',
            moon_phase VARCHAR(20) COMMENT '月相名称',
            moon_phase_icon VARCHAR(10) COMMENT '月相图标代码',
            temp_max INT COMMENT '最高温度',
            temp_min INT COMMENT '最低温度',
            icon_day VARCHAR(10) COMMENT '白天天气图标代码',
            text_day VARCHAR(20) COMMENT '白天天气描述',
            icon_night VARCHAR(10) COMMENT '夜间天气图标代码',
            text_night VARCHAR(20) COMMENT '夜间天气描述',
            wind360_day INT COMMENT '白天风向360角度',
            wind_dir_day VARCHAR(20) COMMENT '白天风向',
            wind_scale_day VARCHAR(10) COMMENT '白天风力等级',
            wind_speed_day INT COMMENT '白天风速 (km/h)',
            wind360_night INT COMMENT '夜间风向360角度',
            wind_dir_night VARCHAR(20) COMMENT '夜间风向',
            wind_scale_night VARCHAR(10) COMMENT '夜间风力等级',
            wind_speed_night INT COMMENT '夜间风速 (km/h)',
            precip DECIMAL(5,1) COMMENT '降水量 (mm)',
            uv_index INT COMMENT '紫外线指数',
            humidity INT COMMENT '相对湿度 (%)',
            pressure INT COMMENT '大气压强 (hPa)',
            vis INT COMMENT '能见度 (km)',
            cloud INT COMMENT '云量 (%)',
            update_time DATETIME COMMENT '数据更新时间',
            UNIQUE KEY unique_city_date (city, fx_date)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='天气数据表'
        """)
        conn.commit()

    # 立即执行一次更新
    update_weather()

    # 启动定时任务
    setup_scheduler()

本节小结

本节主要描述了smartVoyage天气数据从采集到定时更新入库的全过程。