Files
FableFlow/02_Memory/story_monitor.py

237 lines
8.9 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
合金经书西行代码故事监控脚本
动态监控故事发展辅助记忆基座更新
"""
import os
import time
import json
from datetime import datetime
from pathlib import Path
class StoryMonitor:
def __init__(self, project_root="h:\\学习资料\\write\\1stnovel"):
self.project_root = Path(project_root)
self.story_dir = self.project_root / "03_story"
self.memory_dir = self.project_root / "02_memory"
self.memory_file = self.memory_dir / "记忆基座.md"
# 监控状态记录
self.last_modified = {}
self.update_log = []
def scan_story_files(self):
"""扫描故事目录,返回所有章节文件"""
story_files = []
for file_path in self.story_dir.glob("*.md"):
if file_path.name.startswith("") and "" in file_path.name:
story_files.append(file_path)
return sorted(story_files)
def get_chapter_info(self, file_path):
"""获取章节基本信息"""
filename = file_path.name
# 提取章节编号
if "第1章" in filename:
return 1, "第1章_黑风号的末班车"
elif "第2章" in filename:
return 2, "第2章_逃出新长安"
elif "第3章" in filename:
return 3, "第3章_贫民窟的黑市"
# 可以继续添加更多章节识别
return None, filename
def check_for_updates(self):
"""检查故事文件是否有更新"""
updates = []
story_files = self.scan_story_files()
for file_path in story_files:
current_mtime = file_path.stat().st_mtime
filename = file_path.name
if filename not in self.last_modified or current_mtime > self.last_modified[filename]:
chapter_num, chapter_name = self.get_chapter_info(file_path)
updates.append({
'filename': filename,
'chapter_num': chapter_num,
'chapter_name': chapter_name,
'modified_time': datetime.fromtimestamp(current_mtime),
'file_size': file_path.stat().st_size
})
self.last_modified[filename] = current_mtime
return updates
def analyze_chapter_content(self, file_path):
"""分析章节内容,提取关键信息"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
analysis = {
'character_interactions': [],
'new_items': [],
'key_events': [],
'location_changes': []
}
# 简单的关键词分析(可以进一步扩展)
lines = content.split('\n')
for line in lines:
line = line.strip()
if not line:
continue
# 检测角色互动
if any(name in line for name in ['石行者', '老猪', '沙静', '唐藏']):
analysis['character_interactions'].append(line[:100]) # 截取前100字符
# 检测新物品
if any(keyword in line for keyword in ['获得', '找到', '发现', '装备']):
analysis['new_items'].append(line[:100])
# 检测关键事件
if any(keyword in line for keyword in ['战斗', '冲突', '合作', '逃离']):
analysis['key_events'].append(line[:100])
# 检测地点变化
if any(keyword in line for keyword in ['到达', '进入', '离开', '隧道', '城市']):
analysis['location_changes'].append(line[:100])
return analysis
except Exception as e:
print(f"分析章节内容时出错: {e}")
return None
def generate_update_report(self, updates):
"""生成更新报告"""
if not updates:
return "没有检测到更新"
report = f"# 故事更新报告 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
for update in updates:
report += f"## 检测到更新: {update['chapter_name']}\n"
report += f"- **文件**: {update['filename']}\n"
report += f"- **修改时间**: {update['modified_time']}\n"
report += f"- **文件大小**: {update['file_size']} 字节\n"
# 分析内容
file_path = self.story_dir / update['filename']
analysis = self.analyze_chapter_content(file_path)
if analysis:
report += f"- **角色互动**: {len(analysis['character_interactions'])}\n"
report += f"- **新物品**: {len(analysis['new_items'])}\n"
report += f"- **关键事件**: {len(analysis['key_events'])}\n"
report += f"- **地点变化**: {len(analysis['location_changes'])}\n"
report += "\n"
return report
def save_update_log(self, report):
"""保存更新日志"""
log_file = self.memory_dir / "更新日志.md"
# 如果日志文件不存在,创建它
if not log_file.exists():
with open(log_file, 'w', encoding='utf-8') as f:
f.write("# 记忆基座更新日志\n\n")
# 追加新的报告
with open(log_file, 'a', encoding='utf-8') as f:
f.write(report)
f.write("\n" + "="*50 + "\n\n")
def monitor_loop(self, interval=60):
"""监控循环"""
print(f"开始监控故事目录: {self.story_dir}")
print(f"监控间隔: {interval}")
print("按 Ctrl+C 停止监控\n")
try:
while True:
updates = self.check_for_updates()
if updates:
report = self.generate_update_report(updates)
print(report)
self.save_update_log(report)
# 提示用户需要更新记忆基座
print("⚠️ 检测到故事更新,请及时更新记忆基座!")
print(" 运行: python update_memory.py")
else:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 监控中... 未检测到更新")
time.sleep(interval)
except KeyboardInterrupt:
print("\n监控已停止")
except Exception as e:
print(f"监控过程中出错: {e}")
def main():
"""主函数"""
monitor = StoryMonitor()
# 检查目录是否存在
if not monitor.story_dir.exists():
print(f"错误: 故事目录不存在: {monitor.story_dir}")
return
if not monitor.memory_dir.exists():
print(f"错误: 记忆目录不存在: {monitor.memory_dir}")
return
# 显示当前故事状态
story_files = monitor.scan_story_files()
print(f"发现 {len(story_files)} 个故事章节:")
for file_path in story_files:
chapter_num, chapter_name = monitor.get_chapter_info(file_path)
mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
print(f" - {chapter_name} (最后修改: {mtime})")
print("\n选择操作:")
print("1. 启动持续监控")
print("2. 单次检查更新")
print("3. 生成当前状态报告")
choice = input("请输入选择 (1-3): ").strip()
if choice == "1":
interval = input("输入监控间隔(秒默认60): ").strip()
interval = int(interval) if interval.isdigit() else 60
monitor.monitor_loop(interval)
elif choice == "2":
updates = monitor.check_for_updates()
report = monitor.generate_update_report(updates)
print(report)
monitor.save_update_log(report)
elif choice == "3":
# 生成详细的状态报告
story_files = monitor.scan_story_files()
print(f"\n# 当前故事状态报告")
print(f"总章节数: {len(story_files)}")
print(f"最后更新: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("\n各章节状态:")
for file_path in story_files:
chapter_num, chapter_name = monitor.get_chapter_info(file_path)
analysis = monitor.analyze_chapter_content(file_path)
print(f"\n## {chapter_name}")
if analysis:
print(f"- 角色互动: {len(analysis['character_interactions'])}")
print(f"- 关键事件: {len(analysis['key_events'])}")
print(f"- 物品变化: {len(analysis['new_items'])}")
else:
print("无效选择")
if __name__ == "__main__":
main()