Files
FableFlow/02_Memory/story_monitor.py

237 lines
8.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
《合金经书:西行代码》故事监控脚本
动态监控故事发展,辅助记忆基座更新
"""
import os
import time
import json
from datetime import datetime
from pathlib import Path
class StoryMonitor:
def __init__(self, project_root="h:\\学习资料\\write\\1stnovel"):
self.project_root = Path(project_root)
self.story_dir = self.project_root / "03_story"
self.memory_dir = self.project_root / "02_memory"
self.memory_file = self.memory_dir / "记忆基座.md"
# 监控状态记录
self.last_modified = {}
self.update_log = []
def scan_story_files(self):
"""扫描故事目录,返回所有章节文件"""
story_files = []
for file_path in self.story_dir.glob("*.md"):
if file_path.name.startswith("") and "" in file_path.name:
story_files.append(file_path)
return sorted(story_files)
def get_chapter_info(self, file_path):
"""获取章节基本信息"""
filename = file_path.name
# 提取章节编号
if "第1章" in filename:
return 1, "第1章_黑风号的末班车"
elif "第2章" in filename:
return 2, "第2章_逃出新长安"
elif "第3章" in filename:
return 3, "第3章_贫民窟的黑市"
# 可以继续添加更多章节识别
return None, filename
def check_for_updates(self):
"""检查故事文件是否有更新"""
updates = []
story_files = self.scan_story_files()
for file_path in story_files:
current_mtime = file_path.stat().st_mtime
filename = file_path.name
if filename not in self.last_modified or current_mtime > self.last_modified[filename]:
chapter_num, chapter_name = self.get_chapter_info(file_path)
updates.append({
'filename': filename,
'chapter_num': chapter_num,
'chapter_name': chapter_name,
'modified_time': datetime.fromtimestamp(current_mtime),
'file_size': file_path.stat().st_size
})
self.last_modified[filename] = current_mtime
return updates
def analyze_chapter_content(self, file_path):
"""分析章节内容,提取关键信息"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
analysis = {
'character_interactions': [],
'new_items': [],
'key_events': [],
'location_changes': []
}
# 简单的关键词分析(可以进一步扩展)
lines = content.split('\n')
for line in lines:
line = line.strip()
if not line:
continue
# 检测角色互动
if any(name in line for name in ['石行者', '老猪', '沙静', '唐藏']):
analysis['character_interactions'].append(line[:100]) # 截取前100字符
# 检测新物品
if any(keyword in line for keyword in ['获得', '找到', '发现', '装备']):
analysis['new_items'].append(line[:100])
# 检测关键事件
if any(keyword in line for keyword in ['战斗', '冲突', '合作', '逃离']):
analysis['key_events'].append(line[:100])
# 检测地点变化
if any(keyword in line for keyword in ['到达', '进入', '离开', '隧道', '城市']):
analysis['location_changes'].append(line[:100])
return analysis
except Exception as e:
print(f"分析章节内容时出错: {e}")
return None
def generate_update_report(self, updates):
"""生成更新报告"""
if not updates:
return "没有检测到更新"
report = f"# 故事更新报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
for update in updates:
report += f"## 检测到更新: {update['chapter_name']}\n"
report += f"- **文件**: {update['filename']}\n"
report += f"- **修改时间**: {update['modified_time']}\n"
report += f"- **文件大小**: {update['file_size']} 字节\n"
# 分析内容
file_path = self.story_dir / update['filename']
analysis = self.analyze_chapter_content(file_path)
if analysis:
report += f"- **角色互动**: {len(analysis['character_interactions'])}\n"
report += f"- **新物品**: {len(analysis['new_items'])}\n"
report += f"- **关键事件**: {len(analysis['key_events'])}\n"
report += f"- **地点变化**: {len(analysis['location_changes'])}\n"
report += "\n"
return report
def save_update_log(self, report):
"""保存更新日志"""
log_file = self.memory_dir / "更新日志.md"
# 如果日志文件不存在,创建它
if not log_file.exists():
with open(log_file, 'w', encoding='utf-8') as f:
f.write("# 记忆基座更新日志\n\n")
# 追加新的报告
with open(log_file, 'a', encoding='utf-8') as f:
f.write(report)
f.write("\n" + "="*50 + "\n\n")
def monitor_loop(self, interval=60):
"""监控循环"""
print(f"开始监控故事目录: {self.story_dir}")
print(f"监控间隔: {interval}")
print("按 Ctrl+C 停止监控\n")
try:
while True:
updates = self.check_for_updates()
if updates:
report = self.generate_update_report(updates)
print(report)
self.save_update_log(report)
# 提示用户需要更新记忆基座
print("⚠️ 检测到故事更新,请及时更新记忆基座!")
print(" 运行: python update_memory.py")
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 监控中... 未检测到更新")
time.sleep(interval)
except KeyboardInterrupt:
print("\n监控已停止")
except Exception as e:
print(f"监控过程中出错: {e}")
def main():
"""主函数"""
monitor = StoryMonitor()
# 检查目录是否存在
if not monitor.story_dir.exists():
print(f"错误: 故事目录不存在: {monitor.story_dir}")
return
if not monitor.memory_dir.exists():
print(f"错误: 记忆目录不存在: {monitor.memory_dir}")
return
# 显示当前故事状态
story_files = monitor.scan_story_files()
print(f"发现 {len(story_files)} 个故事章节:")
for file_path in story_files:
chapter_num, chapter_name = monitor.get_chapter_info(file_path)
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
print(f" - {chapter_name} (最后修改: {mtime})")
print("\n选择操作:")
print("1. 启动持续监控")
print("2. 单次检查更新")
print("3. 生成当前状态报告")
choice = input("请输入选择 (1-3): ").strip()
if choice == "1":
interval = input("输入监控间隔(秒默认60): ").strip()
interval = int(interval) if interval.isdigit() else 60
monitor.monitor_loop(interval)
elif choice == "2":
updates = monitor.check_for_updates()
report = monitor.generate_update_report(updates)
print(report)
monitor.save_update_log(report)
elif choice == "3":
# 生成详细的状态报告
story_files = monitor.scan_story_files()
print(f"\n# 当前故事状态报告")
print(f"总章节数: {len(story_files)}")
print(f"最后更新: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\n各章节状态:")
for file_path in story_files:
chapter_num, chapter_name = monitor.get_chapter_info(file_path)
analysis = monitor.analyze_chapter_content(file_path)
print(f"\n## {chapter_name}")
if analysis:
print(f"- 角色互动: {len(analysis['character_interactions'])}")
print(f"- 关键事件: {len(analysis['key_events'])}")
print(f"- 物品变化: {len(analysis['new_items'])}")
else:
print("无效选择")
if __name__ == "__main__":
main()