Files
weixin-holiday-message/batch_ocr_complete.py

189 lines
5.7 KiB
Python
Raw Permalink Normal View History

# -*- coding: utf-8 -*-
"""
对完整截图进行OCR识别并与数据库对比去重
"""
import os
import sqlite3
import requests
import base64
from PIL import Image
import glob
import json
def ocr_image(image_path):
"""OCR识别单张图片"""
with open(image_path, 'rb') as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
url = "http://localhost:11434/api/chat"
payload = {
"model": "glm-ocr",
"messages": [{
"role": "user",
"content": """识别图片中的所有联系人名称。要求:
1. 只输出联系人名称每行一个
2. 忽略分组标题如星号字母A-Z等
3. 忽略数字统计
4. 不要添加任何其他内容""",
"images": [image_base64]
}],
"stream": False
}
try:
response = requests.post(url, json=payload, timeout=60)
return response.json().get('message', {}).get('content', '')
except Exception as e:
print(f"OCR失败: {e}")
return ""
def is_valid_contact(line):
"""判断是否是有效的联系人"""
line = line.strip()
if not line or len(line) < 2:
return False
invalid = ["公众号", "服务号", "企业微信联系人", "我的企业", "联系人",
"星标朋友", "新的朋友", "群聊", "标签", "仅聊天", "设备"]
if line in invalid:
return False
if len(line) == 1 and line.isalpha():
return False
if line.startswith(">") or line.startswith("!"):
return False
if line.startswith('"') or line.startswith('{') or line.startswith('[') or line.startswith('```'):
return False
if line.startswith('"') and ':' in line:
return False
return True
def clean_contact_name(name):
"""清理联系人名称"""
name = name.strip('"\'')
name = name.rstrip(',,。::')
return name.strip()
def get_existing_contacts():
"""从数据库获取已存在的联系人"""
conn = sqlite3.connect(r'D:\夏骥\微信研究\contacts.db')
cursor = conn.cursor()
cursor.execute('SELECT name FROM contacts')
existing = set(row[0] for row in cursor.fetchall())
conn.close()
return existing
def add_new_contacts(new_contacts):
"""将新联系人添加到数据库"""
if not new_contacts:
return 0
conn = sqlite3.connect(r'D:\夏骥\微信研究\contacts.db')
cursor = conn.cursor()
# 获取当前最大ID
cursor.execute('SELECT MAX(id) FROM contacts')
max_id = cursor.fetchone()[0] or 0
added = 0
for idx, name in enumerate(new_contacts, start=max_id + 1):
cursor.execute('''
INSERT INTO contacts (id, name, category, blessing, selected)
VALUES (?, ?, ?, ?, ?)
''', (idx, name, '', '马年新春快乐!愿您在新的一年里,事业腾飞,马到成功!', False))
added += 1
conn.commit()
conn.close()
return added
def main():
print("=" * 60)
print("批量OCR识别并去重入库")
print("=" * 60)
# 获取截图目录
scroll_dir = r"D:\夏骥\微信研究\scroll_complete"
if not os.path.exists(scroll_dir):
print(f"目录不存在: {scroll_dir}")
return
screenshots = sorted(glob.glob(os.path.join(scroll_dir, "*.png")))
if not screenshots:
print("未找到截图文件!")
return
print(f"找到 {len(screenshots)} 张截图")
# 获取已存在的联系人
existing_contacts = get_existing_contacts()
print(f"数据库中已有 {len(existing_contacts)} 个联系人")
all_new_contacts = set()
skipped_count = 0
for i, path in enumerate(screenshots):
print(f"\n[{i+1}/{len(screenshots)}] {os.path.basename(path)}")
result = ocr_image(path)
new_in_this = 0
for line in result.strip().split('\n'):
line = line.strip()
if is_valid_contact(line):
cleaned = clean_contact_name(line)
if cleaned and len(cleaned) >= 2:
if cleaned in existing_contacts:
skipped_count += 1
print(f" - {cleaned} (已存在,跳过)")
elif cleaned not in all_new_contacts:
new_in_this += 1
print(f" + {cleaned} (新)")
all_new_contacts.add(cleaned)
print(f" 本轮新增 {new_in_this},累计新发现 {len(all_new_contacts)},跳过 {skipped_count}")
print(f"\n{'='*60}")
print(f"OCR完成")
print(f"发现新联系人: {len(all_new_contacts)}")
print(f"跳过已存在: {skipped_count}")
# 入库
if all_new_contacts:
added = add_new_contacts(sorted(all_new_contacts, key=lambda x: (not x[0].isalpha() if x else True, x.lower() if x and x[0].isalpha() else x)))
print(f"成功入库: {added}")
# 更新JSON文件
conn = sqlite3.connect(r'D:\夏骥\微信研究\contacts.db')
cursor = conn.cursor()
cursor.execute('SELECT * FROM contacts ORDER BY id')
all_contacts = []
for row in cursor.fetchall():
all_contacts.append({
"id": row[0],
"name": row[1],
"category": row[2],
"blessing": row[3],
"selected": bool(row[4])
})
conn.close()
json_file = r"D:\夏骥\微信研究\contacts_data.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(all_contacts, f, ensure_ascii=False, indent=2)
print(f"JSON数据已更新: {json_file}")
print(f"数据库总联系人: {len(all_contacts)}")
if __name__ == '__main__':
main()