# -*- coding: utf-8 -*- """ 快速OCR识别 - 每5张截图识别一次 """ import os import sqlite3 import requests import base64 from PIL import Image import glob import json import time def ocr_image(image_path): """OCR识别单张图片""" with open(image_path, 'rb') as f: image_base64 = base64.b64encode(f.read()).decode('utf-8') url = "http://localhost:11434/api/chat" payload = { "model": "glm-ocr", "messages": [{ "role": "user", "content": """识别图片中的所有联系人名称。要求: 1. 只输出联系人名称,每行一个 2. 忽略分组标题(如星号、字母A-Z等) 3. 忽略数字统计 4. 不要添加任何其他内容""", "images": [image_base64] }], "stream": False } try: response = requests.post(url, json=payload, timeout=60) return response.json().get('message', {}).get('content', '') except Exception as e: print(f"OCR失败: {e}") return "" def is_valid_contact(line): """判断是否是有效的联系人""" line = line.strip() if not line or len(line) < 2: return False invalid = ["公众号", "服务号", "企业微信联系人", "我的企业", "联系人", "星标朋友", "新的朋友", "群聊", "标签", "仅聊天", "设备"] if line in invalid: return False if len(line) == 1 and line.isalpha(): return False if line.startswith(">") or line.startswith("!"): return False return True def clean_contact_name(name): """清理联系人名称""" name = name.strip('"\'') name = name.rstrip(',,。::') return name.strip() def get_existing_contacts(): """从数据库获取已存在的联系人""" conn = sqlite3.connect(r'D:\夏骥\微信研究\contacts.db') cursor = conn.cursor() cursor.execute('SELECT name FROM contacts') existing = set(row[0] for row in cursor.fetchall()) conn.close() return existing def add_new_contacts(new_contacts): """将新联系人添加到数据库""" if not new_contacts: return 0 conn = sqlite3.connect(r'D:\夏骥\微信研究\contacts.db') cursor = conn.cursor() # 获取当前最大ID cursor.execute('SELECT MAX(id) FROM contacts') max_id = cursor.fetchone()[0] or 0 added = 0 for idx, name in enumerate(new_contacts, start=max_id + 1): cursor.execute(''' INSERT INTO contacts (id, name, category, blessing, selected) VALUES (?, ?, ?, ?, ?) ''', (idx, name, '', '马年新春快乐!愿您在新的一年里,事业腾飞,马到成功!', False)) added += 1 conn.commit() conn.close() return added def main(): print("=" * 60) print("快速OCR识别 - 采样模式") print("=" * 60) # 获取截图目录 scroll_dir = r"D:\夏骥\微信研究\scroll_complete" screenshots = sorted(glob.glob(os.path.join(scroll_dir, "*.png"))) if not screenshots: print("未找到截图文件!") return print(f"找到 {len(screenshots)} 张截图") # 获取已存在的联系人 existing_contacts = get_existing_contacts() print(f"数据库中已有 {len(existing_contacts)} 个联系人") # 每5张截图识别一次 step = 5 all_new_contacts = set() skipped_count = 0 for i in range(0, len(screenshots), step): batch = screenshots[i:i+step] print(f"\n[{i+1}/{len(screenshots)}] 处理批次 {i//step + 1}") # 只识别批次的第一张 path = batch[0] result = ocr_image(path) new_in_this = 0 for line in result.strip().split('\n'): line = line.strip() if is_valid_contact(line): cleaned = clean_contact_name(line) if cleaned and len(cleaned) >= 2: if cleaned in existing_contacts: skipped_count += 1 elif cleaned not in all_new_contacts: new_in_this += 1 all_new_contacts.add(cleaned) print(f" + {cleaned}") print(f" 本轮新增 {new_in_this},累计新发现 {len(all_new_contacts)}") print(f"\n{'='*60}") print(f"OCR完成!") print(f"发现新联系人: {len(all_new_contacts)} 个") print(f"跳过已存在: {skipped_count} 个") # 入库 if all_new_contacts: added = add_new_contacts(sorted(all_new_contacts, key=lambda x: (not x[0].isalpha() if x else True, x.lower() if x and x[0].isalpha() else x))) print(f"成功入库: {added} 个") # 更新JSON文件 conn = sqlite3.connect(r'D:\夏骥\微信研究\contacts.db') cursor = conn.cursor() cursor.execute('SELECT * FROM contacts ORDER BY id') all_contacts = [] for row in cursor.fetchall(): all_contacts.append({ "id": row[0], "name": row[1], "category": row[2], "blessing": row[3], "selected": bool(row[4]) }) conn.close() json_file = r"D:\夏骥\微信研究\contacts_data.json" with open(json_file, 'w', encoding='utf-8') as f: json.dump(all_contacts, f, ensure_ascii=False, indent=2) print(f"JSON数据已更新: {json_file}") print(f"数据库总联系人: {len(all_contacts)} 个") if __name__ == '__main__': main()