做一个galgame猜猜乐游戏!
最近被毕设aigc查重折磨的够呛,来做个小玩具梳理一下心情.jpg 目标是参考https://anime-character-guessr.netlify.app/singleplayer做一个猜galgame的版本 感觉做猜人物会很麻烦,先从猜游戏名开始吧! :::tip 提示 太菜了,这个game摸了 :::
数据清理
第一步比较麻烦的是数据处理部分 参照的数据源毫无疑问是bangumi~ 原以为直接参照bangumi api文档去检索就可以了 然而主要问题在数据处理部分,bangumi没有对游戏进行整合,对于白色相簿2这种一个系列好几部的作品是分开来陈列的,同时如果想要roll一个游戏出来的话,bangumi的api也没有提供范围检索的功能
所以做法还是先用爬虫把bangumi的galgame数据全部爬下来,然后做处理
第一步是获取所有的galgame数据 参照了bangumi的app,用Html解析把galgame tag的页面全爬下来做处理,这里的鉴权是用的cookie,从web里面拷贝一下就好了 不加也能获取到列表,但是Nsfw内容的封面没法正常获取。
import requests
from bs4 import BeautifulSoup
import json
import time
import html
# 目标URL
BASE_URL = 'https://bgm.tv/game/tag/Galgame/?sort=rank&page={}'
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
    'cookie': 'your_cookie_here',
}
def fetch_page(page=1):
    url = BASE_URL.format(page)
    resp = requests.get(url, headers=HEADERS)
    resp.raise_for_status()
    return resp.content.decode('utf-8')
def parse_games(html_str):
    soup = BeautifulSoup(html_str, 'html.parser')
    items = []
    ul = soup.find('ul', {'id': 'browserItemList'})
    if not ul:
        return items
    for li in ul.find_all('li', class_='item'):
        # id
        a = li.find('h3').find('a', class_='l') if li.find('h3') else None
        id_ = a['href'] if a and a.has_attr('href') else ''
        if id_.startswith('/subject/'):
            id_ = id_.replace('/subject/', '')
        # 封面
        cover = ''
        a_tag = li.find('a')
        if a_tag:
            # 先找 a > span > img
            span = a_tag.find('span')
            if span:
                img = span.find('img')
                if img and img.has_attr('src'):
                    cover = img['src']
            # 如果没找到,再找 a > noscript > img
            if not cover:
                noscript = a_tag.find('noscript')
                if noscript:
                    soup_ns = BeautifulSoup(noscript.decode_contents(), 'html.parser')
                    img = soup_ns.find('img')
                    if img and img.has_attr('src'):
                        cover = img['src']
        # 处理无效图片和补全绝对地址
        if cover in ['/img/info_only.png', '/img/no_icon_subject.png']:
            cover = ''
        elif cover and cover.startswith('/'):
            cover = 'https://bgm.tv' + cover
        elif cover and cover.startswith('//'):
            cover = 'https:' + cover
        # 标题
        name = li.find('small', class_='grey')
        name = html.unescape(name.text.strip()) if name else ''
        name_cn = html.unescape(a.text.strip()) if a else ''
        # 分数
        score = li.find('small', class_='fade')
        score = html.unescape(score.text.strip()) if score else ''
        # 总评价人数
        total = li.find('span', class_='tip_j')
        total = html.unescape(total.text.strip()) if total else ''
        # 简介
        tip = li.find('p', class_='info tip')
        tip = html.unescape(tip.text.strip()) if tip else ''
        items.append({
            'id': id_,
            'cover': cover,
            'name': name,
            'nameCn': name_cn,
            'tip': tip,
            'score': score,
            'total': total,
        })
    return items
def get_total_pages(html):
    soup = BeautifulSoup(html, 'html.parser')
    page_info = soup.find('span', class_='p_edge')
    if page_info and '/' in page_info.text:
        try:
            return int(page_info.text.split('/')[-1].replace(')', '').strip())
        except Exception:
            return 1
    return 1
def main():
    all_data = []
    first_html = fetch_page(1)
    total_pages = get_total_pages(first_html)
    print(f'共 {total_pages} 页')
    all_data.extend(parse_games(first_html))
    for page in range(2, total_pages + 1):
        print(f'正在抓取第 {page} 页...')
        html = fetch_page(page)
        all_data.extend(parse_games(html))
        time.sleep(1)  # 防止被ban
    with open('data/bgm_galgame_rank.json', 'w', encoding='utf-8') as f:
        json.dump(all_data, f, ensure_ascii=False, indent=2)
    print('抓取完成,已保存到 data/bgm_galgame_rank.json')
if __name__ == '__main__':
    main()后来发现没必要这样,bangumi的api文档里是有这个接口的,路径是/v0/search/subjects 可能app做的时候还没有这个接口 这个接口获取到的数据格式是
 {
    "id": "226254",
    "cover": "https://bgm.tv//lain.bgm.tv/pic/cover/c/13/7b/226254_UNFNa.jpg",
    "name": "ランス10",
    "nameCn": "兰斯10 决战",
    "tip": "2018-02-23 / PC (Windows 7/8/10) / 大型战争RPG / ALICESOFT",
    "score": "9.3",
    "total": "(2841人评分)"
  },第二部是获取到游戏的每个具体数据,这里需要调用/v0/subjects/{subject_id}、/v0/subjects/{subject_id}/persons、/v0/subjects/{subject_id}/characters、/v0/subjects/{subject_id}/subjects好几个接口 persons接口是获取制作人员信息的,subjects则是获取相关条目 这里用ai帮我们写一下脚本,这里筛了几个不是很重要的信息,比如summary 这里的鉴权是用的token,api文档里面有对应的获取链接,不加的话无法访问nsfw内容
import requests
import json
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36',
    'Authorization': 'Bearer your_token_here',
}
def fetch_subject_by_id(subject_id):
    url = f'https://api.bgm.tv/v0/subjects/{subject_id}'
    resp = requests.get(url, headers=HEADERS)
    resp.raise_for_status()
    return resp.json()
def fetch_subject_persons(subject_id):
    url = f'https://api.bgm.tv/v0/subjects/{subject_id}/persons'
    resp = requests.get(url, headers=HEADERS)
    resp.raise_for_status()
    return resp.json()
def fetch_subject_characters(subject_id):
    url = f'https://api.bgm.tv/v0/subjects/{subject_id}/characters'
    resp = requests.get(url, headers=HEADERS)
    resp.raise_for_status()
    return resp.json()
def fetch_subject_relations(subject_id):
    url = f'https://api.bgm.tv/v0/subjects/{subject_id}/subjects'
    resp = requests.get(url, headers=HEADERS)
    resp.raise_for_status()
    return resp.json()
def filter_subject(subject):
    # 去除不需要的字段
    for key in ["summary", "rating", "collection", "total_episodes"]:
        subject.pop(key, None)
    return subject
def filter_persons(persons):
    # persons为列表
    for p in persons:
        p.pop("images", None)
        p.pop("eps", None)
    return persons
def fetch_subjects_allinfo_batch(rank_json_path='data/bgm_galgame_rank.json', out_json_prefix='data/bgm_galgame_subjects_allinfo_', batch_size=10, max_workers=10):
    with open(rank_json_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    ids = [item['id'] for item in data]
    total = len(ids)
    for i in range(950, total, batch_size):
        batch_ids = ids[i:i+batch_size]
        result = []
        def fetch_one(subject_id):
            subject_info = {}
            try:
                subject = fetch_subject_by_id(subject_id)
                subject_info['subject'] = filter_subject(subject)
                persons = fetch_subject_persons(subject_id)
                subject_info['persons'] = filter_persons(persons)
                subject_info['characters'] = fetch_subject_characters(subject_id)
                subject_info['relations'] = fetch_subject_relations(subject_id)
            except Exception as e:
                print(f'获取条目 {subject_id} 相关信息失败: {e}')
            return subject_info
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_id = {executor.submit(fetch_one, sid): sid for sid in batch_ids}
            for future in as_completed(future_to_id):
                info = future.result()
                if info:
                    result.append(info)
        out_json_path = f'{out_json_prefix}{i//batch_size+1}.json'
        with open(out_json_path, 'w', encoding='utf-8') as f:
            json.dump(result, f, ensure_ascii=False, indent=2)
        print(f'第{i//batch_size+1}批(共{len(batch_ids)}条)已保存到 {out_json_path}')
if __name__ == '__main__':
    fetch_subjects_allinfo_batch()这样子就获取到了详细信息,很长一串,简化过的示例是
  {
    "subject": {
      "date": "2002-08-29",
      "platform": "游戏",
      "images": {
        "small": "https://lain.bgm.tv/r/200/pic/cover/l/ff/ee/1126_wtoHO.jpg",
      },
      "name": "Ever17 -the out of infinity-",
      "name_cn": "时空轮回",
      "tags": [
        {
          "name": "Galgame",
          "count": 2391,
          "total_cont": 0
        },
        // ...
      ],
      "infobox": [
        {
          "key": "中文名",
          "value": "时空轮回"
        },
        // ...
      ],
      "id": 1126,
      "eps": 0,
      "meta_tags": [
        "PSP",
      ],
      "volumes": 0,
      "series": false,
      "locked": false,
      "nsfw": false,
      "type": 4,
      "score": "9.1",
      "total": "(8227人评分)"
    },
    "persons": [
      {
        "name": "輿水隆之",
        "relation": "作画监督",
        "career": [
          "illustrator"
        ],
        "type": 1,
        "id": 9803
      },
    ],
    "characters": [
      {
        "images": {
          "small": "https://lain.bgm.tv/r/100/pic/crt/l/47/85/11716_crt_L0bC9.jpg",
        },
        "name": "倉成武",
        "relation": "主角",
        "actors": [
          {
            "images": {
              "small": "https://lain.bgm.tv/r/100/pic/crt/l/7e/7d/3884_prsn_R7RZ7.jpg?r=1657125754",
            },
            "name": "保志総一朗",
            "career": [
              "artist",
              "seiyu"
            ],
            "id": 3884,
            "type": 1,
            "locked": false
          }
        ],
        "type": 1,
        "id": 11716
      },
      // ...
    ],
    "relations": [
      {
        "images": {
          "small": "https://lain.bgm.tv/r/200/pic/cover/l/ba/66/51969_jp.jpg",
        },
        "name": "Ever17 ボーカルコレクション",
        "name_cn": "Ever17 人声精选集",
        "relation": "角色歌",
        "type": 3,
        "id": 51969
      },
      // ...
    ]
  }一下子爬出来有85w行,但我的mba居然还在硬刚它,虽然卡的不行。
所有数据都获取到了,接下来是改造成我们需要的形状 首先是relations游戏的合并,我们希望把同一个系列的合并到一起 这里类似于并查集的逻辑合并起来,在数据处理上,我们只认relations中的以下几个属性
valid_relation_types = {
        '不同演绎', '前传', '续集', '同一系列',
        '改编', '主线故事', '番外', '重制'
    }然后尝试把同一个系列的合并起来
#!/usr/bin/env python3
import json
import os
import sys
from collections import defaultdict
from typing import Dict, List, Set, Any, Optional, Tuple, DefaultDict
class UnionFind:
    """并查集实现,用于高效合并关系"""
    def __init__(self):
        self.parent: Dict[int, int] = {}
        self.rank: Dict[int, int] = {}
    def find(self, x: int) -> int:
        """查找并返回x的根节点,同时压缩路径"""
        if x not in self.parent:
            self.parent[x] = x
            self.rank[x] = 0
        if self.parent[x] != x:
            self.parent[x] = self.find(self.parent[x])
        return self.parent[x]
    def union(self, x: int, y: int) -> None:
        """合并x和y所在的集合"""
        root_x = self.find(x)
        root_y = self.find(y)
        if root_x == root_y:
            return
        # 按秩合并,保持树的高度平衡
        if self.rank[root_x] < self.rank[root_y]:
            self.parent[root_x] = root_y
        elif self.rank[root_x] > self.rank[root_y]:
            self.parent[root_y] = root_x
        else:
            self.parent[root_y] = root_x
            self.rank[root_x] += 1
def extract_year(date_str: Optional[str]) -> int:
    """从日期字符串中提取年份,如果无效则返回9999"""
    if not date_str:
        return 9999
    try:
        # 尝试从多种格式中提取年份
        if '-' in date_str:
            return int(date_str.split('-')[0])
        elif '年' in date_str:
            return int(date_str.split('年')[0])
        elif '/' in date_str:
            return int(date_str.split('/')[0])
        elif len(date_str) >= 4 and date_str[:4].isdigit():
            return int(date_str[:4])
        return 9999
    except (ValueError, IndexError):
        return 9999
def process_relations(input_file: Optional[str] = None, output_file: Optional[str] = None) -> Dict[str, Any]:
    """
    处理relations字段,将相关联的游戏合并成一组
    Args:
        input_file: 输入文件路径,默认为当前目录下的bgm_galgame_subjects_allinfo.json
        output_file: 输出文件路径,默认为当前目录下的bgm_galgame_series.json
    Returns:
        包含处理结果统计信息的字典
    """
    # 文件路径
    current_dir = os.path.dirname(os.path.abspath(__file__))
    if input_file is None:
        input_file = os.path.join(current_dir, 'jsonData', 'bgm_galgame_subjects_allinfo.json')
    if output_file is None:
        output_file = os.path.join(current_dir, 'jsonData', 'bgm_galgame_series.json')
    # 读取JSON文件
    try:
        with open(input_file, 'r', encoding='utf-8') as f:
            data = json.load(f)
    except Exception as e:
        return {'success': False, 'error': str(e)}
    # 初始化并查集
    uf = UnionFind()
    id_to_subject: Dict[int, Dict[str, Any]] = {}
    id_to_item: Dict[int, Dict[str, Any]] = {}
    # 定义有效的关系类型,明确排除"其他"
    valid_relation_types = {
        '不同演绎', '前传', '续集', '同一系列',
        '改编', '主线故事', '番外', '重制'
    }
    # 第一步:为每个游戏创建映射并收集所有ID
    for item in data:
        if 'subject' in item and 'id' in item['subject']:
            subject_id = item['subject']['id']
            id_to_subject[subject_id] = item['subject']
            id_to_item[subject_id] = item
    # 第二步:处理所有关系,构建并查集
    relation_count = 0
    relation_types: DefaultDict[str, int] = defaultdict(int)
    valid_relation_count = 0
    for item in data:
        if ('subject' in item and 'id' in item['subject'] and
            'relations' in item and item['relations']):
            source_id = item['subject']['id']
            for relation in item['relations']:
                if 'id' not in relation:
                    continue
                target_id = relation['id']
                relation_type = relation.get('relation', '未知关系')
                relation_types[relation_type] += 1
                # 统计关系总数
                relation_count += 1
                # 合并条件:
                # 1. 游戏类型(type=4)
                # 2. 在有效关系类型列表中
                # 明确排除"其他"类型
                if (relation.get('type') == 4 and relation_type in valid_relation_types):
                    uf.union(source_id, target_id)
                    valid_relation_count += 1
    # 第三步:根据并查集结果合并关系
    merged_groups: DefaultDict[int, Dict[str, Any]] = defaultdict(
        lambda: {'ids': [], 'games': []}
    )
    # 为每个ID找到其根节点,并将其添加到对应组
    for subject_id in id_to_subject:
        root_id = uf.find(subject_id)
        merged_groups[root_id]['ids'].append(subject_id)
    # 为每个组收集游戏信息
    for root_id, group in merged_groups.items():
        for subject_id in group['ids']:
            if subject_id in id_to_subject:
                subject = id_to_subject[subject_id]
                # 提取所需信息
                game_info = {
                    'id': subject['id'],
                    'name': subject['name'],
                    'name_cn': subject.get('name_cn', subject['name']),
                    'date': subject.get('date', ''),
                    'year': extract_year(subject.get('date', '')),
                    'meta_tags': subject.get('meta_tags', []),
                    'score': subject.get('score', None),
                    'type': subject.get('type', None),
                    'platform': [],
                    'infobox': []
                }
                # 提取平台信息
                if 'infobox' in subject:
                    for item in subject['infobox']:
                        if item.get('key') == '平台':
                            if isinstance(item.get('value'), list):
                                for platform in item['value']:
                                    if isinstance(platform, dict) and 'v' in platform:
                                        game_info['platform'].append(platform['v'])
                                    elif isinstance(platform, str):
                                        game_info['platform'].append(platform)
                            elif isinstance(item.get('value'), str):
                                game_info['platform'].append(item['value'])
                        # 只保留关键的infobox信息
                        if item.get('key') in ['中文名', '游戏类型', '发行日期', '开发', '发行']:
                            game_info['infobox'].append(item)
                group['games'].append(game_info)
    # 按年份和得分排序每个组内的游戏
    for group in merged_groups.values():
        # 首先按年份升序,然后按评分降序
        group['games'].sort(key=lambda x: (x.get('year', 9999), -(float(x.get('score', 0) or 0))))
    # 过滤只有一个游戏的组
    filtered_groups = [group for group in merged_groups.values() if len(group['ids']) > 1]
    # 转换成列表格式并按照组大小排序
    result = sorted(filtered_groups, key=lambda x: len(x['ids']), reverse=True)
    # 写入结果
    try:
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(result, f, ensure_ascii=False, indent=2)
    except Exception as e:
        return {'success': False, 'error': str(e)}
    return {
        'success': True,
        'group_count': len(result),
        'total_games': len(id_to_subject),
        'valid_relations': valid_relation_count,
        'total_relations': relation_count,
        'relation_types': dict(relation_types)
    }
def main():
    """命令行入口"""
    import argparse
    parser = argparse.ArgumentParser(description='处理Bangumi游戏关系数据,合并相关游戏')
    parser.add_argument('-i', '--input', help='输入文件路径')
    parser.add_argument('-o', '--output', help='输出文件路径')
    args = parser.parse_args()
    result = process_relations(
        input_file=args.input,
        output_file=args.output
    )
    if not result['success']:
        sys.exit(1)
    sys.exit(0)
if __name__ == '__main__':
    main()合并之后查看合完的信息,发现数据上有些问题,比如青空下的约定和帕露菲合一起去了 还有发现有的数据少了,比如初音岛的一和二没有
前者靠人工处理一下,后者的话重新抓一下数据, 最开始是按照排名抓的,里面会有很多标注数在一百上下的不会出现在里面。