225 lines
6.4 KiB
Python
225 lines
6.4 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
使用OCR识别微信通讯录 - 两阶段处理
|
|||
|
|
第一阶段:快速滚动截图直到到底
|
|||
|
|
第二阶段:批量OCR识别所有截图
|
|||
|
|
"""
|
|||
|
|
import uiautomation as auto
|
|||
|
|
import time
|
|||
|
|
import requests
|
|||
|
|
import base64
|
|||
|
|
import os
|
|||
|
|
from PIL import Image
|
|||
|
|
|
|||
|
|
|
|||
|
|
def capture_wechat_window():
|
|||
|
|
"""截取微信窗口"""
|
|||
|
|
wechat_window = auto.WindowControl(searchDepth=1, Name='微信')
|
|||
|
|
|
|||
|
|
if not wechat_window.Exists(3, 1):
|
|||
|
|
print("未找到微信窗口!请确保微信已打开并登录。")
|
|||
|
|
return None, None
|
|||
|
|
|
|||
|
|
print(f"找到微信窗口: {wechat_window.Name}")
|
|||
|
|
|
|||
|
|
screenshot_path = r"D:\夏骥\微信研究\wechat_screenshot.png"
|
|||
|
|
wechat_window.CaptureToImage(screenshot_path)
|
|||
|
|
|
|||
|
|
return screenshot_path, wechat_window
|
|||
|
|
|
|||
|
|
|
|||
|
|
def capture_contact_region(wechat_window, index):
|
|||
|
|
"""截取通讯录区域"""
|
|||
|
|
rect = wechat_window.BoundingRectangle
|
|||
|
|
|
|||
|
|
x_offset = 70
|
|||
|
|
y_offset = 130
|
|||
|
|
width = 280
|
|||
|
|
height = rect.height() - 160
|
|||
|
|
|
|||
|
|
screenshot_path = f"D:\\夏骥\\微信研究\\scroll\\region_{index:03d}.png"
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
bitmap = wechat_window.ToBitmap(x=x_offset, y=y_offset, width=width, height=height)
|
|||
|
|
bitmap.ToFile(screenshot_path)
|
|||
|
|
return screenshot_path
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"截图失败: {e}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_image_hash(image_path):
|
|||
|
|
"""计算图片哈希"""
|
|||
|
|
try:
|
|||
|
|
img = Image.open(image_path)
|
|||
|
|
img = img.resize((16, 16), Image.Resampling.LANCZOS)
|
|||
|
|
img = img.convert('L')
|
|||
|
|
pixels = list(img.get_flattened_data())
|
|||
|
|
avg = sum(pixels) / len(pixels)
|
|||
|
|
return ''.join(['1' if p > avg else '0' for p in pixels])
|
|||
|
|
except:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def images_similarity(hash1, hash2):
|
|||
|
|
"""计算相似度"""
|
|||
|
|
if not hash1 or not hash2:
|
|||
|
|
return 0
|
|||
|
|
diff = sum(c1 != c2 for c1, c2 in zip(hash1, hash2))
|
|||
|
|
return 1 - diff / len(hash1)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def scroll_down(wechat_window):
|
|||
|
|
"""向下滚动"""
|
|||
|
|
try:
|
|||
|
|
rect = wechat_window.BoundingRectangle
|
|||
|
|
center_x = rect.left + 200
|
|||
|
|
center_y = rect.top + 400
|
|||
|
|
|
|||
|
|
auto.SetCursorPos(center_x, center_y)
|
|||
|
|
auto.Click(center_x, center_y)
|
|||
|
|
time.sleep(0.2)
|
|||
|
|
auto.WheelDown(wheelTimes=3)
|
|||
|
|
time.sleep(0.3)
|
|||
|
|
return True
|
|||
|
|
except:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
def ocr_image(image_path):
|
|||
|
|
"""OCR识别单张图片"""
|
|||
|
|
with open(image_path, 'rb') as f:
|
|||
|
|
image_base64 = base64.b64encode(f.read()).decode('utf-8')
|
|||
|
|
|
|||
|
|
url = "http://localhost:11434/api/chat"
|
|||
|
|
payload = {
|
|||
|
|
"model": "glm-ocr",
|
|||
|
|
"messages": [{
|
|||
|
|
"role": "user",
|
|||
|
|
"content": """识别图片中的所有联系人名称。要求:
|
|||
|
|
1. 只输出联系人名称,每行一个
|
|||
|
|
2. 忽略分组标题(如星号、字母A-Z等)
|
|||
|
|
3. 忽略数字统计
|
|||
|
|
4. 不要添加任何其他内容""",
|
|||
|
|
"images": [image_base64]
|
|||
|
|
}],
|
|||
|
|
"stream": False
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
response = requests.post(url, json=payload, timeout=60)
|
|||
|
|
return response.json().get('message', {}).get('content', '')
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"OCR失败: {e}")
|
|||
|
|
return ""
|
|||
|
|
|
|||
|
|
|
|||
|
|
def is_valid_contact(line):
|
|||
|
|
"""判断是否是有效的联系人"""
|
|||
|
|
line = line.strip()
|
|||
|
|
if not line:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
# 过滤分组标题
|
|||
|
|
invalid = ["公众号", "服务号", "企业微信联系人", "我的企业", "联系人",
|
|||
|
|
"星标朋友", "新的朋友", "群聊", "标签", "仅聊天", "设备"]
|
|||
|
|
|
|||
|
|
if line in invalid:
|
|||
|
|
return False
|
|||
|
|
if len(line) == 1 and line.isalpha():
|
|||
|
|
return False
|
|||
|
|
if line.startswith(">") or line.startswith("!"):
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
return True
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
print("=" * 60)
|
|||
|
|
print("微信通讯录OCR识别 - 两阶段处理")
|
|||
|
|
print("=" * 60)
|
|||
|
|
|
|||
|
|
# 创建截图目录
|
|||
|
|
scroll_dir = r"D:\夏骥\微信研究\scroll"
|
|||
|
|
os.makedirs(scroll_dir, exist_ok=True)
|
|||
|
|
|
|||
|
|
# 清理旧截图
|
|||
|
|
for f in os.listdir(scroll_dir):
|
|||
|
|
if f.endswith('.png'):
|
|||
|
|
os.remove(os.path.join(scroll_dir, f))
|
|||
|
|
|
|||
|
|
# ===== 第一阶段:快速滚动截图 =====
|
|||
|
|
print("\n[阶段1] 滚动截图中...")
|
|||
|
|
|
|||
|
|
_, wechat_window = capture_wechat_window()
|
|||
|
|
if not wechat_window:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
screenshots = []
|
|||
|
|
last_hash = None
|
|||
|
|
no_change = 0
|
|||
|
|
max_screenshots = 100
|
|||
|
|
|
|||
|
|
for i in range(max_screenshots):
|
|||
|
|
path = capture_contact_region(wechat_window, i)
|
|||
|
|
if path:
|
|||
|
|
screenshots.append(path)
|
|||
|
|
print(f" 截图 {i+1}: {path}")
|
|||
|
|
|
|||
|
|
# 检测是否到底
|
|||
|
|
current_hash = get_image_hash(path)
|
|||
|
|
if last_hash:
|
|||
|
|
sim = images_similarity(last_hash, current_hash)
|
|||
|
|
if sim > 0.95:
|
|||
|
|
no_change += 1
|
|||
|
|
if no_change >= 2:
|
|||
|
|
print(f"\n 检测到到底,共截图 {len(screenshots)} 张")
|
|||
|
|
break
|
|||
|
|
else:
|
|||
|
|
no_change = 0
|
|||
|
|
last_hash = current_hash
|
|||
|
|
|
|||
|
|
scroll_down(wechat_window)
|
|||
|
|
|
|||
|
|
print(f"\n[阶段1完成] 共截图 {len(screenshots)} 张")
|
|||
|
|
|
|||
|
|
# ===== 第二阶段:批量OCR =====
|
|||
|
|
print("\n[阶段2] OCR识别中...")
|
|||
|
|
|
|||
|
|
all_contacts = set()
|
|||
|
|
|
|||
|
|
for i, path in enumerate(screenshots):
|
|||
|
|
print(f" OCR {i+1}/{len(screenshots)}: ", end="", flush=True)
|
|||
|
|
result = ocr_image(path)
|
|||
|
|
|
|||
|
|
new_count = 0
|
|||
|
|
for line in result.strip().split('\n'):
|
|||
|
|
line = line.strip()
|
|||
|
|
if is_valid_contact(line):
|
|||
|
|
if line not in all_contacts:
|
|||
|
|
new_count += 1
|
|||
|
|
all_contacts.add(line)
|
|||
|
|
|
|||
|
|
print(f"新增 {new_count} 个,累计 {len(all_contacts)} 个")
|
|||
|
|
|
|||
|
|
# ===== 保存结果 =====
|
|||
|
|
print("\n[保存结果]")
|
|||
|
|
|
|||
|
|
sorted_contacts = sorted(all_contacts, key=lambda x: (not x[0].isalpha(), x.lower() if x[0].isalpha() else x))
|
|||
|
|
|
|||
|
|
result_file = r"D:\夏骥\微信研究\ocr_result.txt"
|
|||
|
|
with open(result_file, 'w', encoding='utf-8') as f:
|
|||
|
|
f.write(f"微信通讯录OCR识别结果\n")
|
|||
|
|
f.write(f"共截图 {len(screenshots)} 张\n")
|
|||
|
|
f.write(f"共识别 {len(all_contacts)} 个联系人\n")
|
|||
|
|
f.write("=" * 60 + "\n\n")
|
|||
|
|
for c in sorted_contacts:
|
|||
|
|
f.write(f"{c}\n")
|
|||
|
|
|
|||
|
|
print(f"结果已保存: {result_file}")
|
|||
|
|
print(f"\n共识别到 {len(all_contacts)} 个联系人")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == '__main__':
|
|||
|
|
main()
|