#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 《合金经书:西行代码》故事监控脚本 动态监控故事发展,辅助记忆基座更新 """ import os import time import json from datetime import datetime from pathlib import Path class StoryMonitor: def __init__(self, project_root="h:\\学习资料\\write\\1stnovel"): self.project_root = Path(project_root) self.story_dir = self.project_root / "03_story" self.memory_dir = self.project_root / "02_memory" self.memory_file = self.memory_dir / "记忆基座.md" # 监控状态记录 self.last_modified = {} self.update_log = [] def scan_story_files(self): """扫描故事目录,返回所有章节文件""" story_files = [] for file_path in self.story_dir.glob("*.md"): if file_path.name.startswith("第") and "章" in file_path.name: story_files.append(file_path) return sorted(story_files) def get_chapter_info(self, file_path): """获取章节基本信息""" filename = file_path.name # 提取章节编号 if "第1章" in filename: return 1, "第1章_黑风号的末班车" elif "第2章" in filename: return 2, "第2章_逃出新长安" elif "第3章" in filename: return 3, "第3章_贫民窟的黑市" # 可以继续添加更多章节识别 return None, filename def check_for_updates(self): """检查故事文件是否有更新""" updates = [] story_files = self.scan_story_files() for file_path in story_files: current_mtime = file_path.stat().st_mtime filename = file_path.name if filename not in self.last_modified or current_mtime > self.last_modified[filename]: chapter_num, chapter_name = self.get_chapter_info(file_path) updates.append({ 'filename': filename, 'chapter_num': chapter_num, 'chapter_name': chapter_name, 'modified_time': datetime.fromtimestamp(current_mtime), 'file_size': file_path.stat().st_size }) self.last_modified[filename] = current_mtime return updates def analyze_chapter_content(self, file_path): """分析章节内容,提取关键信息""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() analysis = { 'character_interactions': [], 'new_items': [], 'key_events': [], 'location_changes': [] } # 简单的关键词分析(可以进一步扩展) lines = content.split('\n') for line in lines: line = line.strip() if not line: continue # 检测角色互动 if any(name in line for name in ['石行者', '老猪', '沙静', '唐藏']): analysis['character_interactions'].append(line[:100]) # 截取前100字符 # 检测新物品 if any(keyword in line for keyword in ['获得', '找到', '发现', '装备']): analysis['new_items'].append(line[:100]) # 检测关键事件 if any(keyword in line for keyword in ['战斗', '冲突', '合作', '逃离']): analysis['key_events'].append(line[:100]) # 检测地点变化 if any(keyword in line for keyword in ['到达', '进入', '离开', '隧道', '城市']): analysis['location_changes'].append(line[:100]) return analysis except Exception as e: print(f"分析章节内容时出错: {e}") return None def generate_update_report(self, updates): """生成更新报告""" if not updates: return "没有检测到更新" report = f"# 故事更新报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" for update in updates: report += f"## 检测到更新: {update['chapter_name']}\n" report += f"- **文件**: {update['filename']}\n" report += f"- **修改时间**: {update['modified_time']}\n" report += f"- **文件大小**: {update['file_size']} 字节\n" # 分析内容 file_path = self.story_dir / update['filename'] analysis = self.analyze_chapter_content(file_path) if analysis: report += f"- **角色互动**: {len(analysis['character_interactions'])} 处\n" report += f"- **新物品**: {len(analysis['new_items'])} 个\n" report += f"- **关键事件**: {len(analysis['key_events'])} 个\n" report += f"- **地点变化**: {len(analysis['location_changes'])} 处\n" report += "\n" return report def save_update_log(self, report): """保存更新日志""" log_file = self.memory_dir / "更新日志.md" # 如果日志文件不存在,创建它 if not log_file.exists(): with open(log_file, 'w', encoding='utf-8') as f: f.write("# 记忆基座更新日志\n\n") # 追加新的报告 with open(log_file, 'a', encoding='utf-8') as f: f.write(report) f.write("\n" + "="*50 + "\n\n") def monitor_loop(self, interval=60): """监控循环""" print(f"开始监控故事目录: {self.story_dir}") print(f"监控间隔: {interval} 秒") print("按 Ctrl+C 停止监控\n") try: while True: updates = self.check_for_updates() if updates: report = self.generate_update_report(updates) print(report) self.save_update_log(report) # 提示用户需要更新记忆基座 print("⚠️ 检测到故事更新,请及时更新记忆基座!") print(" 运行: python update_memory.py") else: print(f"[{datetime.now().strftime('%H:%M:%S')}] 监控中... 未检测到更新") time.sleep(interval) except KeyboardInterrupt: print("\n监控已停止") except Exception as e: print(f"监控过程中出错: {e}") def main(): """主函数""" monitor = StoryMonitor() # 检查目录是否存在 if not monitor.story_dir.exists(): print(f"错误: 故事目录不存在: {monitor.story_dir}") return if not monitor.memory_dir.exists(): print(f"错误: 记忆目录不存在: {monitor.memory_dir}") return # 显示当前故事状态 story_files = monitor.scan_story_files() print(f"发现 {len(story_files)} 个故事章节:") for file_path in story_files: chapter_num, chapter_name = monitor.get_chapter_info(file_path) mtime = datetime.fromtimestamp(file_path.stat().st_mtime) print(f" - {chapter_name} (最后修改: {mtime})") print("\n选择操作:") print("1. 启动持续监控") print("2. 单次检查更新") print("3. 生成当前状态报告") choice = input("请输入选择 (1-3): ").strip() if choice == "1": interval = input("输入监控间隔(秒,默认60): ").strip() interval = int(interval) if interval.isdigit() else 60 monitor.monitor_loop(interval) elif choice == "2": updates = monitor.check_for_updates() report = monitor.generate_update_report(updates) print(report) monitor.save_update_log(report) elif choice == "3": # 生成详细的状态报告 story_files = monitor.scan_story_files() print(f"\n# 当前故事状态报告") print(f"总章节数: {len(story_files)}") print(f"最后更新: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print("\n各章节状态:") for file_path in story_files: chapter_num, chapter_name = monitor.get_chapter_info(file_path) analysis = monitor.analyze_chapter_content(file_path) print(f"\n## {chapter_name}") if analysis: print(f"- 角色互动: {len(analysis['character_interactions'])}") print(f"- 关键事件: {len(analysis['key_events'])}") print(f"- 物品变化: {len(analysis['new_items'])}") else: print("无效选择") if __name__ == "__main__": main()