抓取留档B站热搜

结城 工具箱 5 次阅读 828 字 发布于 2026-05-16 预计阅读时间: 4 分钟


import requests
import csv
from datetime import datetime
import os
import json

def fetch_bilibili_trending():
    """获取B站热搜数据"""
    url = "https://app.bilibili.com/x/v2/search/trending/ranking?limit=30"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
        "Referer": "https://www.bilibili.com/"
    }
    
    try:
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        if data.get("code") == 0:
            return {
                "trackid": data["data"].get("trackid", ""),
                "list": data["data"].get("list", []),
                "top_list": data["data"].get("top_list", []),
                "hotword_egg_info": data["data"].get("hotword_egg_info", "")
            }
        else:
            print(f"API返回错误: code={data.get('code')}, message={data.get('message')}")
            return None
            
    except requests.exceptions.RequestException as e:
        print(f"请求API失败: {e}")
        return None
    except json.JSONDecodeError as e:
        print(f"解析JSON失败: {e}")
        return None

def get_existing_records(filepath):
    """读取已有记录,用于去重"""
    existing = set()
    if os.path.exists(filepath):
        with open(filepath, "r", encoding="utf-8-sig") as f:
            reader = csv.DictReader(f)
            for row in reader:
                key = f"{row['record_time']}_{row['keyword']}"
                existing.add(key)
    return existing

def save_to_daily_csv(data):
    """保存到每日CSV文件(追加模式)"""
    if not data:
        return False
    
    try:
        # 创建目录
        os.makedirs("daily_data", exist_ok=True)
        
        # 当天文件名
        today = datetime.now().strftime("%Y%m%d")
        filepath = f"daily_data/bilibili_trending_{today}.csv"
        
        # 获取已有记录
        existing_records = get_existing_records(filepath)
        record_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # 写入模式:如果文件不存在则写表头,存在则追加
        mode = "a" if os.path.exists(filepath) else "w"
        
        with open(filepath, mode, newline="", encoding="utf-8-sig") as f:
            writer = csv.writer(f)
            
            # 首次写入时需要表头
            if mode == "w":
                writer.writerow([
                    "trackid", "record_time", "type", "position", 
                    "keyword", "show_name", "word_type", "icon", 
                    "hot_id", "is_commercial", "resource_id", "fetch_time"
                ])
            
            new_records_count = 0
            
            # 处理普通热搜
            for item in data.get("list", []):
                record_key = f"{record_time}_{item.get('keyword', '')}"
                if record_key not in existing_records:
                    writer.writerow([
                        data.get("trackid", ""),
                        record_time,
                        "普通热搜",
                        item.get("position"),
                        item.get("keyword", ""),
                        item.get("show_name", ""),
                        item.get("word_type", 0),
                        item.get("icon", ""),
                        item.get("hot_id", 0),
                        item.get("is_commercial", ""),
                        item.get("resource_id", 0),
                        datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    ])
                    new_records_count += 1
            
            # 处理置顶热搜
            for item in data.get("top_list", []):
                record_key = f"{record_time}_{item.get('keyword', '')}"
                if record_key not in existing_records:
                    writer.writerow([
                        data.get("trackid", ""),
                        record_time,
                        "置顶热搜",
                        "",  # 置顶热搜没有position
                        item.get("keyword", ""),
                        item.get("show_name", ""),
                        item.get("word_type", 0),
                        item.get("icon", ""),
                        item.get("hot_id", 0),
                        "",  # 置顶热搜没有is_commercial
                        item.get("resource_id", 0),
                        datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                    ])
                    new_records_count += 1
        
        print(f"已追加 {new_records_count} 条新记录到 {filepath}")
        return True
        
    except Exception as e:
        print(f"保存CSV失败: {e}")
        return False

def main():
    """主函数"""
    print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')} 开始获取B站热搜数据...")
    trending_data = fetch_bilibili_trending()
    
    if trending_data:
        if save_to_daily_csv(trending_data):
            print("数据保存成功")
        else:
            print("数据保存失败")
    else:
        print("获取热搜数据失败")

if __name__ == "__main__":
    main()

配套保存的数据库表结构

CREATE DATABASE IF NOT EXISTS bilibili_trending 
CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

USE bilibili_trending;

CREATE TABLE IF NOT EXISTS trending_records (
    id INT AUTO_INCREMENT PRIMARY KEY,
    trackid VARCHAR(64) COMMENT '追踪ID',
    record_time VARCHAR(40) COMMENT '记录时间',
    type VARCHAR(20) COMMENT '热搜类型(普通热搜/置顶热搜)',
    position INT COMMENT '排名位置',
    keyword VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '关键词',
    show_name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '显示名称',
    word_type INT COMMENT '词类型',
    icon VARCHAR(512) COMMENT '图标URL',
    hot_id INT COMMENT '热度ID',
    is_commercial VARCHAR(10) COMMENT '是否商业',
    resource_id INT COMMENT '资源ID',
    fetch_time VARCHAR(40) COMMENT '抓取时间',
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;