From 8600c0f5764fa29ee93c7f9061ef587ab26b0f04 Mon Sep 17 00:00:00 2001 From: xiaji Date: Fri, 6 Mar 2026 15:07:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=88=9D=E5=A7=8B=E6=8F=90=E4=BA=A4=20?= =?UTF-8?q?-=20=E6=BB=9A=E5=8A=A8=E6=88=AA=E5=B1=8FOCR=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 实现智能区域检测算法(灰度阈值 + 连续行判定) - 支持Umi-OCR和自定义HTTP OCR服务 - 添加热键触发和鼠标框选区域功能 - 实现自动滚动和智能停止逻辑 - 添加完整的README文档 --- .gitignore | 46 ++++ README.md | 215 +++++++++++++++ main.py | 604 ++++++++++++++++++++++++++++++++++++++++++ ocr_server_example.py | 146 ++++++++++ requirements.txt | 8 + umi_ocr_client.py | 228 ++++++++++++++++ 6 files changed, 1247 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 main.py create mode 100644 ocr_server_example.py create mode 100644 requirements.txt create mode 100644 umi_ocr_client.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d2014f9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,46 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +env/ +ENV/ +.venv/ +.env/ + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Output directory +output/ + +# Logs +*.log +logs/ + +# OS +.DS_Store +Thumbs.db diff --git a/README.md b/README.md new file mode 100644 index 0000000..d31b424 --- /dev/null +++ b/README.md @@ -0,0 +1,215 @@ +# 滚动截屏OCR工具 + +一个智能的滚动截屏OCR工具,可以自动识别页面中的内容区块(div),滚动截屏并进行OCR文字识别。 + +## 功能特点 + +- 🎯 **智能区域检测**:使用灰度阈值 + 连续行判定算法,自动识别内容区块(div)和空白间隔 +- 📜 **自动滚动截屏**:根据内容高度自动计算滚动距离,连续截屏 +- 🔤 **OCR文字识别**:支持 Umi-OCR 和自定义HTTP OCR服务 +- ⌨️ **热键触发**:按 `Ctrl+F9` 快速启动 +- 🖱️ **框选区域**:拖动鼠标选择截图区域 +- 🛑 **智能停止**:检测到重复内容时自动停止 + +## 适用场景 + +- 长网页滚动截图OCR +- 聊天记录导出 +- 长文档内容提取 +- 任何需要滚动才能看完全部的内容 + +## 安装 + +### 1. 克隆仓库 + +```bash +git clone <远程仓库地址> +cd long-screen-cut +``` + +### 2. 安装依赖 + +```bash +pip install -r requirements.txt +``` + +依赖列表: +- opencv-python >= 4.8.0 +- numpy >= 1.24.0 +- pillow >= 10.0.0 +- pyautogui >= 0.9.54 +- keyboard >= 0.13.5 +- mouse >= 0.7.1 +- requests >= 2.31.0 +- loguru >= 0.7.0 + +### 3. 安装OCR引擎(二选一) + +#### 方案A:Umi-OCR(推荐) + +1. 下载 [Umi-OCR](https://github.com/hiroi-sora/Umi-OCR/releases) +2. 解压并运行 `Umi-OCR.exe` +3. 进入 **设置 → HTTP接口** +4. 勾选 **启用HTTP服务** +5. 确保端口为 `1224`(默认) + +#### 方案B:自定义HTTP OCR服务 + +参考 `ocr_server_example.py` 实现自己的OCR服务,或修改配置使用其他OCR API。 + +## 使用方法 + +### 启动程序 + +```bash +python main.py +``` + +### 操作流程 + +1. **等待热键**:程序启动后会显示 `等待热键 Ctrl+F9 启动...` +2. **触发截屏**:按 `Ctrl+F9` +3. **检查服务**:程序会检查OCR服务是否运行 +4. **框选区域**:按住鼠标左键拖动,选择要截图的区域 +5. **自动处理**:程序会自动: + - 截取当前屏幕 + - 分析内容区块(div) + - OCR识别文字 + - 计算滚动距离 + - 滚动到下一屏 + - 重复上述过程 +6. **自动停止**:当检测到重复内容时自动停止 + +### 输出结果 + +- 截图保存在 `output/` 目录 +- OCR结果保存在 `output/all_results_时间戳.json` + +## 配置说明 + +编辑 `main.py` 中的 `Config` 类: + +```python +class Config: + # 热键设置 + HOTKEY = "ctrl+f9" + + # 图像分析参数 + GRAY_THRESHOLD = 240 # 灰度阈值(0-255) + CONSECUTIVE_LINES = 3 # 连续多少行判定为空白 + WHITE_PIXEL_RATIO = 0.9 # 白色像素比例阈值 + + # OCR设置 + OCR_ENGINE = "umi" # "umi" 或 "http" + OCR_API_URL = "http://localhost:8000/ocr" # HTTP模式时使用 + OCR_TIMEOUT = 30 # OCR请求超时时间(秒) + + # Umi-OCR设置 + UMI_OCR_HOST = "127.0.0.1" + UMI_OCR_PORT = 1224 + + # 滚动设置 + SCROLL_DELAY = 0.5 # 滚动后等待渲染时间(秒) + MAX_SCROLL_COUNT = 100 # 最大滚动次数 + + # 输出设置 + OUTPUT_DIR = "output" +``` + +## 核心算法 + +### 内容区块检测算法 + +1. **灰度转换**:将截图转换为灰度图 +2. **逐行扫描**:计算每行的白色像素比例 +3. **空白判定**:如果一行中超过 `WHITE_PIXEL_RATIO`(默认90%)的像素灰度值 > `GRAY_THRESHOLD`(默认240),则认为是空白行 +4. **连续判定**:连续 `CONSECUTIVE_LINES`(默认3行)空白行视为间隔区域 +5. **区块划分**:非空白行区域视为内容区块(div) + +### 滚动距离计算 + +``` +滚动距离 = 第一个div高度 + 其后空白间隔高度 - 重叠区域 +``` + +重叠区域确保连续性,默认为div高度的1/4。 + +## 项目结构 + +``` +long-screen-cut/ +├── main.py # 主程序 +├── umi_ocr_client.py # Umi-OCR HTTP客户端 +├── ocr_server_example.py # OCR服务示例(Flask) +├── requirements.txt # Python依赖 +├── .gitignore # Git忽略配置 +└── README.md # 本文件 +``` + +## API文档 + +### UmiOCRClient + +```python +from umi_ocr_client import UmiOCRClient + +client = UmiOCRClient(host="127.0.0.1", port=1224) + +# 检查服务状态 +if client.is_service_running(): + print("服务运行中") + +# 截图识别 +text = client.recognize_screenshot() + +# 图片文件识别 +text = client.recognize_image("/path/to/image.png") + +# 批量识别 +texts = client.recognize_images(["1.png", "2.png", "3.png"]) +``` + +## 常见问题 + +### Q: 程序提示"Umi-OCR服务未运行" + +A: 请确保: +1. Umi-OCR软件已启动 +2. 进入 **设置 → HTTP接口** +3. 勾选 **启用HTTP服务** +4. 端口设置为 `1224` + +### Q: 识别区域不准确 + +A: 调整 `Config` 中的图像分析参数: +- `GRAY_THRESHOLD`:降低可以识别更浅的背景色 +- `CONSECUTIVE_LINES`:增加可以减少误判 +- `WHITE_PIXEL_RATIO`:降低可以容忍更多杂色 + +### Q: 滚动太快/太慢 + +A: 调整 `SCROLL_DELAY`: +- 网页加载慢:增加延迟 +- 本地应用:可以减少延迟 + +### Q: 如何停止程序 + +A: +- 正常停止:按 `Ctrl+C` +- 强制停止:关闭终端窗口 + +## 开发计划 + +- [ ] 支持更多OCR引擎(PaddleOCR、Tesseract等) +- [ ] GUI界面 +- [ ] 支持水平滚动 +- [ ] 智能去重(相似度判断) +- [ ] 导出为多种格式(Markdown、Word、PDF) + +## 许可证 + +MIT License + +## 致谢 + +- [Umi-OCR](https://github.com/hiroi-sora/Umi-OCR) - 优秀的离线OCR软件 diff --git a/main.py b/main.py new file mode 100644 index 0000000..df400bc --- /dev/null +++ b/main.py @@ -0,0 +1,604 @@ +""" +滚动截屏OCR工具 +功能:通过热键激活,手动框选区域后,自动滚动截屏并进行OCR识别 +""" + +import json +import time +import base64 +import io +import tempfile +from dataclasses import dataclass, field +from typing import List, Tuple, Optional, Callable +from pathlib import Path + +import cv2 +import numpy as np +import requests +from PIL import Image +import pyautogui +import keyboard +import mouse +from loguru import logger + +from umi_ocr_client import UmiOCRClient, check_and_wait_for_service + + +@dataclass +class DivRegion: + """div区域数据结构""" + top: int + bottom: int + left: int + right: int + text: str = "" + + @property + def height(self) -> int: + return self.bottom - self.top + + @property + def width(self) -> int: + return self.right - self.left + + +@dataclass +class GapInfo: + """空白间隔信息""" + start_row: int + end_row: int + + @property + def height(self) -> int: + return self.end_row - self.start_row + + +@dataclass +class AnalysisResult: + """图像分析结果""" + divs: List[DivRegion] = field(default_factory=list) + gaps: List[GapInfo] = field(default_factory=list) + + +class Config: + """配置类""" + # 热键设置 + HOTKEY = "ctrl+f9" + + # 图像分析参数 + GRAY_THRESHOLD = 240 # 灰度阈值,接近白色的阈值 + CONSECUTIVE_LINES = 3 # 连续多少行判定为空白 + WHITE_PIXEL_RATIO = 0.9 # 一行中超过多少比例的像素为白色才认为是空白行 + + # OCR设置 + OCR_ENGINE = "umi" # OCR引擎: "umi" 使用Umi-OCR, "http" 使用HTTP接口 + OCR_API_URL = "http://localhost:8000/ocr" # HTTP OCR服务地址 (OCR_ENGINE=http时使用) + OCR_TIMEOUT = 30 # OCR请求超时时间 + + # Umi-OCR设置 + UMI_OCR_HOST = "127.0.0.1" + UMI_OCR_PORT = 1224 + + # 滚动设置 + SCROLL_DELAY = 0.5 # 滚动后等待渲染的时间(秒) + MAX_SCROLL_COUNT = 100 # 最大滚动次数,防止无限循环 + + # 输出设置 + OUTPUT_DIR = "output" + + +class RegionSelector: + """区域选择器 - 用于手动框选截图区域""" + + def __init__(self): + self.start_pos: Optional[Tuple[int, int]] = None + self.end_pos: Optional[Tuple[int, int]] = None + self.is_selecting = False + + def select_region(self) -> Tuple[int, int, int, int]: + """ + 手动选择区域,返回 (left, top, right, bottom) + 点击确定左上角,拖动释放确定右下角 + """ + logger.info("请按住鼠标左键拖动选择区域...") + print("\n>>> 请按住鼠标左键拖动选择截图区域,释放后确定 <<<") + + # 等待鼠标按下 + while not mouse.is_pressed(button='left'): + time.sleep(0.01) + + self.start_pos = mouse.get_position() + self.is_selecting = True + logger.info(f"选择开始位置: {self.start_pos}") + + # 等待鼠标释放 + while mouse.is_pressed(button='left'): + time.sleep(0.01) + + self.end_pos = mouse.get_position() + self.is_selecting = False + logger.info(f"选择结束位置: {self.end_pos}") + + # 计算边界 + left = min(self.start_pos[0], self.end_pos[0]) + top = min(self.start_pos[1], self.end_pos[1]) + right = max(self.start_pos[0], self.end_pos[0]) + bottom = max(self.start_pos[1], self.end_pos[1]) + + logger.info(f"选定区域: ({left}, {top}, {right}, {bottom}), 尺寸: {right-left}x{bottom-top}") + print(f"已选择区域: 左上角({left}, {top}), 右下角({right}, {bottom})") + + return left, top, right, bottom + + +class ImageAnalyzer: + """图像分析器 - 分析div边界和空白间隔""" + + def __init__(self, config: Config): + self.config = config + + def analyze(self, image: np.ndarray) -> AnalysisResult: + """ + 分析图像,定位div边界 + 使用灰度阈值 + 连续行判定 + """ + result = AnalysisResult() + + # 转换为灰度图 + if len(image.shape) == 3: + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + else: + gray = image + + height, width = gray.shape + logger.debug(f"分析图像尺寸: {width}x{height}") + + # 逐行分析 + is_in_gap = False + gap_start = 0 + div_start = 0 + consecutive_blank = 0 + + for row in range(height): + # 计算当前行的白色像素比例 + white_pixels = np.sum(gray[row] > self.config.GRAY_THRESHOLD) + white_ratio = white_pixels / width + + is_blank = white_ratio > self.config.WHITE_PIXEL_RATIO + + if is_blank: + consecutive_blank += 1 + else: + # 如果之前是空白区域,现在进入div + if consecutive_blank >= self.config.CONSECUTIVE_LINES and not is_in_gap: + # 记录空白间隔 + gap_end = row - consecutive_blank + gap = GapInfo(start_row=gap_start, end_row=gap_end) + result.gaps.append(gap) + logger.debug(f"发现空白间隔: 行 {gap.start_row}-{gap.end_row}, 高度 {gap.height}") + + # 记录div开始 + div_start = row + is_in_gap = True + + consecutive_blank = 0 + gap_start = row + + # 如果连续多行都是空白,认为是间隔区域 + if consecutive_blank >= self.config.CONSECUTIVE_LINES and is_in_gap: + # 记录div结束 + div_end = row - consecutive_blank + if div_end > div_start: + div = DivRegion( + top=div_start, + bottom=div_end, + left=0, + right=width + ) + result.divs.append(div) + logger.debug(f"发现div区域: 行 {div.top}-{div.bottom}, 高度 {div.height}") + + is_in_gap = False + gap_start = row - consecutive_blank + 1 + + # 处理最后一个div(如果图像不以空白结束) + if not is_in_gap and div_start < height - consecutive_blank: + div = DivRegion( + top=div_start, + bottom=height - consecutive_blank, + left=0, + right=width + ) + result.divs.append(div) + logger.debug(f"发现末尾div区域: 行 {div.top}-{div.bottom}, 高度 {div.height}") + + logger.info(f"分析完成: 发现 {len(result.divs)} 个div, {len(result.gaps)} 个空白间隔") + return result + + def calculate_scroll_distance(self, result: AnalysisResult) -> int: + """ + 根据分析结果计算滚动距离 + 策略:滚动到下一个div的顶部 + """ + if not result.divs: + logger.warning("未检测到div,使用默认滚动距离") + return 100 + + # 获取第一个div和第一个空白间隔 + first_div = result.divs[0] + + # 如果有空白间隔,滚动距离为第一个div高度 + 其后的空白间隔 + scroll_distance = first_div.height + + # 查找第一个div之后的空白间隔 + for gap in result.gaps: + if gap.start_row >= first_div.bottom: + scroll_distance += gap.height + break + + # 添加一些重叠,确保连续性 + overlap = min(20, first_div.height // 4) + scroll_distance = max(scroll_distance - overlap, 50) + + logger.info(f"计算滚动距离: {scroll_distance} 像素") + return int(scroll_distance) + + +class OCREngine: + """OCR引擎 - 调用OCR服务识别文字""" + + def __init__(self, config: Config): + self.config = config + self.umi_client: Optional[UmiOCRClient] = None + + if config.OCR_ENGINE == "umi": + self.umi_client = UmiOCRClient( + host=config.UMI_OCR_HOST, + port=config.UMI_OCR_PORT + ) + + def _recognize_with_http(self, image: np.ndarray) -> List[str]: + """使用HTTP接口进行OCR识别""" + try: + # 将numpy数组转换为PIL Image + if len(image.shape) == 3: + pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) + else: + pil_image = Image.fromarray(image) + + # 转换为base64 + buffered = io.BytesIO() + pil_image.save(buffered, format="PNG") + img_base64 = base64.b64encode(buffered.getvalue()).decode() + + # 调用OCR API + response = requests.post( + self.config.OCR_API_URL, + json={"image": img_base64}, + timeout=self.config.OCR_TIMEOUT + ) + response.raise_for_status() + + data = response.json() + texts = data.get("texts", []) + return texts + + except requests.exceptions.ConnectionError: + logger.error(f"无法连接到OCR服务: {self.config.OCR_API_URL}") + return [] + except Exception as e: + logger.error(f"HTTP OCR识别失败: {e}") + return [] + + def _recognize_with_umi(self, image: np.ndarray) -> List[str]: + """使用Umi-OCR进行识别""" + if not self.umi_client: + logger.error("Umi-OCR客户端未初始化") + return [] + + try: + # 将图像保存为临时文件 + if len(image.shape) == 3: + pil_image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) + else: + pil_image = Image.fromarray(image) + + # 创建临时文件 + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp_file: + tmp_path = tmp_file.name + pil_image.save(tmp_path, format="PNG") + + try: + # 调用Umi-OCR识别 + text = self.umi_client.recognize_image(tmp_path, timeout=self.config.OCR_TIMEOUT) + if text: + # 按行分割 + lines = [line.strip() for line in text.split('\n') if line.strip()] + return lines + return [] + finally: + # 删除临时文件 + try: + Path(tmp_path).unlink() + except Exception: + pass + + except Exception as e: + logger.error(f"Umi-OCR识别失败: {e}") + return [] + + def recognize(self, image: np.ndarray) -> List[str]: + """ + 对图像进行OCR识别 + 返回识别到的文字列表 + """ + if self.config.OCR_ENGINE == "umi": + texts = self._recognize_with_umi(image) + else: + texts = self._recognize_with_http(image) + + logger.info(f"OCR识别完成,识别到 {len(texts)} 段文字") + return texts + + def recognize_divs(self, image: np.ndarray, divs: List[DivRegion]) -> List[str]: + """ + 对每个div区域分别进行OCR识别 + """ + all_texts = [] + for i, div in enumerate(divs): + # 截取div区域 + div_image = image[div.top:div.bottom, div.left:div.right] + texts = self.recognize(div_image) + all_texts.extend(texts) + logger.debug(f"Div {i+1} OCR结果: {texts}") + return all_texts + + def check_service(self) -> bool: + """检查OCR服务是否可用""" + if self.config.OCR_ENGINE == "umi": + if not self.umi_client: + return False + return self.umi_client.is_service_running() + else: + try: + response = requests.get(self.config.OCR_API_URL.replace('/ocr', '/health'), timeout=2) + return response.status_code == 200 + except Exception: + return False + + +class ScrollCaptureOCR: + """滚动截屏OCR主类""" + + def __init__(self): + self.config = Config() + self.region_selector = RegionSelector() + self.image_analyzer = ImageAnalyzer(self.config) + self.ocr_engine = OCREngine(self.config) + + self.capture_region: Optional[Tuple[int, int, int, int]] = None + self.previous_ocr_result: List[str] = [] + self.scroll_count = 0 + self.all_results: List[dict] = [] + + # 创建输出目录 + Path(self.config.OUTPUT_DIR).mkdir(exist_ok=True) + + def capture_screen(self) -> np.ndarray: + """截取指定区域的屏幕""" + if not self.capture_region: + raise ValueError("未设置截图区域") + + left, top, right, bottom = self.capture_region + screenshot = pyautogui.screenshot(region=(left, top, right - left, bottom - top)) + return cv2.cvtColor(np.array(screenshot), cv2.COLOR_RGB2BGR) + + def scroll_screen(self, distance: int): + """在截图区域执行滚动""" + if not self.capture_region: + return + + # 将鼠标移动到截图区域中央 + left, top, right, bottom = self.capture_region + center_x = (left + right) // 2 + center_y = (top + bottom) // 2 + + pyautogui.moveTo(center_x, center_y) + time.sleep(0.1) + + # 执行滚动 + pyautogui.scroll(-distance) + logger.info(f"向下滚动 {distance} 像素") + + # 等待页面渲染 + time.sleep(self.config.SCROLL_DELAY) + + def check_duplicate(self, current_texts: List[str]) -> bool: + """ + 检查当前OCR结果是否与上一次相同 + 用于判断是否到达底部 + """ + if not self.previous_ocr_result: + return False + + # 简单比较:如果文字列表完全相同,认为是重复 + is_duplicate = current_texts == self.previous_ocr_result + + if is_duplicate: + logger.info("检测到OCR结果重复,可能已到达底部") + + return is_duplicate + + def save_result(self, scroll_index: int, image: np.ndarray, texts: List[str]): + """保存截图和OCR结果""" + timestamp = time.strftime("%Y%m%d_%H%M%S") + + # 保存图片 + image_path = Path(self.config.OUTPUT_DIR) / f"capture_{timestamp}_{scroll_index:03d}.png" + cv2.imwrite(str(image_path), image) + + # 保存OCR结果 + result = { + "index": scroll_index, + "timestamp": timestamp, + "image_path": str(image_path), + "texts": texts + } + self.all_results.append(result) + + logger.info(f"保存结果: {image_path}, 识别文字数: {len(texts)}") + + def save_final_result(self): + """保存所有结果到JSON文件""" + output_path = Path(self.config.OUTPUT_DIR) / f"all_results_{time.strftime('%Y%m%d_%H%M%S')}.json" + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(self.all_results, f, ensure_ascii=False, indent=2) + logger.info(f"所有结果已保存到: {output_path}") + print(f"\n所有结果已保存到: {output_path}") + + def process_once(self) -> bool: + """ + 执行一次处理循环 + 返回False表示应该停止 + """ + logger.info(f"=== 第 {self.scroll_count + 1} 次截屏 ===") + print(f"\n>>> 第 {self.scroll_count + 1} 次截屏处理中...") + + # 1. 截取当前屏幕 + image = self.capture_screen() + logger.info(f"截图完成,尺寸: {image.shape[1]}x{image.shape[0]}") + + # 2. 分析图像,定位div边界 + analysis = self.image_analyzer.analyze(image) + + if not analysis.divs: + logger.warning("未检测到任何div区域,可能已到达底部或区域选择有误") + print("警告: 未检测到内容区域") + return False + + # 3. OCR提取文字 + current_texts = self.ocr_engine.recognize_divs(image, analysis.divs) + print(f"识别到 {len(current_texts)} 段文字") + for i, text in enumerate(current_texts[:3], 1): + preview = text[:50] + "..." if len(text) > 50 else text + print(f" [{i}] {preview}") + if len(current_texts) > 3: + print(f" ... 还有 {len(current_texts) - 3} 段文字") + + # 4. 保存结果 + self.save_result(self.scroll_count, image, current_texts) + + # 5. 判断是否到达底部(OCR结果重复) + if self.check_duplicate(current_texts): + print("\n>>> 检测到内容重复,已到达底部,处理完成 <<<") + return False + + self.previous_ocr_result = current_texts + + # 6. 计算滚动距离 + scroll_distance = self.image_analyzer.calculate_scroll_distance(analysis) + + # 7. 执行滚动 + self.scroll_screen(scroll_distance) + + self.scroll_count += 1 + + # 检查最大滚动次数 + if self.scroll_count >= self.config.MAX_SCROLL_COUNT: + logger.warning(f"达到最大滚动次数限制 ({self.config.MAX_SCROLL_COUNT})") + print(f"\n>>> 达到最大滚动次数限制,处理完成 <<<") + return False + + return True + + def run(self): + """主运行流程""" + print("=" * 60) + print("滚动截屏OCR工具") + print("=" * 60) + print(f"\n使用说明:") + print(f"1. 按下热键 {self.config.HOTKEY} 启动") + print(f"2. 按住鼠标左键拖动选择截图区域") + print(f"3. 程序将自动滚动截屏并进行OCR识别") + print(f"4. 当检测到重复内容时自动停止") + print(f"5. 结果将保存在 '{self.config.OUTPUT_DIR}' 目录") + print("\n" + "=" * 60) + + logger.info("程序启动,等待热键触发...") + print(f"\n>>> 等待热键 {self.config.HOTKEY} 启动... <<<") + + # 注册热键 + keyboard.add_hotkey(self.config.HOTKEY, self._on_hotkey) + + # 保持程序运行 + try: + while True: + time.sleep(0.1) + except KeyboardInterrupt: + logger.info("程序被用户中断") + print("\n>>> 程序已停止 <<<") + + def _on_hotkey(self): + """热键回调函数""" + logger.info("热键触发,开始处理") + print(f"\n{'='*60}") + print("热键已触发!") + + # 检查OCR服务 + print("\n>>> 检查OCR服务... <<<") + if not self.ocr_engine.check_service(): + if self.config.OCR_ENGINE == "umi": + print("✗ Umi-OCR服务未运行") + print("请先启动Umi-OCR软件并开启HTTP服务:") + print(" 1. 打开Umi-OCR") + print(" 2. 进入 设置 -> HTTP接口") + print(" 3. 勾选 '启用HTTP服务'") + print(f" 4. 确保端口为 {self.config.UMI_OCR_PORT}") + else: + print(f"✗ OCR服务未运行: {self.config.OCR_API_URL}") + return + + print("✓ OCR服务运行中") + + # 选择区域 + try: + self.capture_region = self.region_selector.select_region() + except Exception as e: + logger.error(f"区域选择失败: {e}") + print(f"区域选择失败: {e}") + return + + # 重置状态 + self.previous_ocr_result = [] + self.scroll_count = 0 + self.all_results = [] + + print(f"\n>>> 开始自动滚动截屏和OCR识别... <<<") + + # 循环处理 + try: + while self.process_once(): + pass + except Exception as e: + logger.error(f"处理过程中出错: {e}", exc_info=True) + print(f"\n错误: {e}") + + # 保存最终结果 + if self.all_results: + self.save_final_result() + print(f"\n共处理 {len(self.all_results)} 次截屏") + print(f"结果保存在: {Path(self.config.OUTPUT_DIR).absolute()}") + + print(f"\n{'='*60}") + print(">>> 等待下一次热键触发... <<<") + logger.info("处理完成,等待下一次热键触发") + + +def main(): + """入口函数""" + app = ScrollCaptureOCR() + app.run() + + +if __name__ == "__main__": + main() diff --git a/ocr_server_example.py b/ocr_server_example.py new file mode 100644 index 0000000..0694fc3 --- /dev/null +++ b/ocr_server_example.py @@ -0,0 +1,146 @@ +""" +OCR服务示例实现 +这是一个简单的OCR HTTP服务示例,使用 PaddleOCR 或 Tesseract 作为后端 +你可以根据实际需求修改此文件或使用其他OCR服务 + +启动方式: python ocr_server_example.py +服务地址: http://localhost:8000 +""" + +import base64 +import io +from typing import List + +try: + from flask import Flask, request, jsonify +except ImportError: + print("请先安装Flask: pip install flask") + raise + +try: + from PIL import Image +except ImportError: + print("请先安装Pillow: pip install pillow") + raise + +app = Flask(__name__) + +# 尝试导入OCR引擎,按优先级:PaddleOCR > Tesseract > 模拟 +ocr_engine = None +ocr_type = None + +try: + from paddleocr import PaddleOCR + ocr_engine = PaddleOCR( + use_angle_cls=True, + lang='ch', + show_log=False + ) + ocr_type = "paddle" + print("使用 PaddleOCR 引擎") +except ImportError: + try: + import pytesseract + ocr_engine = pytesseract + ocr_type = "tesseract" + print("使用 Tesseract OCR 引擎") + except ImportError: + ocr_type = "mock" + print("警告: 未找到OCR引擎,使用模拟模式") + print("建议安装 PaddleOCR: pip install paddleocr") + print("或安装 Tesseract + pytesseract: pip install pytesseract") + + +def recognize_with_paddle(image: Image.Image) -> List[str]: + """使用PaddleOCR识别""" + import numpy as np + img_array = np.array(image) + result = ocr_engine.ocr(img_array, cls=True) + + texts = [] + if result and result[0]: + for line in result[0]: + if line: + text = line[1][0] # 提取文字内容 + confidence = line[1][1] # 置信度 + if confidence > 0.5: # 过滤低置信度结果 + texts.append(text) + return texts + + +def recognize_with_tesseract(image: Image.Image) -> List[str]: + """使用Tesseract识别""" + text = ocr_engine.image_to_string(image, lang='chi_sim+eng') + # 按行分割 + lines = [line.strip() for line in text.split('\n') if line.strip()] + return lines + + +def recognize_mock(image: Image.Image) -> List[str]: + """模拟OCR(用于测试)""" + return ["[模拟OCR] 请安装实际的OCR引擎"] + + +def recognize_image(image: Image.Image) -> List[str]: + """根据配置的引擎进行识别""" + if ocr_type == "paddle": + return recognize_with_paddle(image) + elif ocr_type == "tesseract": + return recognize_with_tesseract(image) + else: + return recognize_mock(image) + + +@app.route('/ocr', methods=['POST']) +def ocr_endpoint(): + """ + OCR API端点 + 接收JSON: {"image": "base64编码的图片"} + 返回JSON: {"texts": ["识别到的文字1", "识别到的文字2", ...]} + """ + try: + data = request.get_json() + if not data or 'image' not in data: + return jsonify({"error": "缺少image字段"}), 400 + + # 解码base64图片 + img_base64 = data['image'] + img_data = base64.b64decode(img_base64) + image = Image.open(io.BytesIO(img_data)) + + # 转换为RGB(如果是RGBA或其他模式) + if image.mode != 'RGB': + image = image.convert('RGB') + + # 执行OCR + texts = recognize_image(image) + + return jsonify({ + "texts": texts, + "count": len(texts), + "engine": ocr_type + }) + + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +@app.route('/health', methods=['GET']) +def health_check(): + """健康检查端点""" + return jsonify({ + "status": "ok", + "engine": ocr_type + }) + + +if __name__ == '__main__': + print("=" * 60) + print("OCR HTTP 服务") + print("=" * 60) + print(f"OCR引擎: {ocr_type}") + print("API地址: http://localhost:8000/ocr") + print("健康检查: http://localhost:8000/health") + print("=" * 60) + print("\n启动服务中...") + app.run(host='0.0.0.0', port=8000, debug=False) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7c87a26 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +opencv-python>=4.8.0 +numpy>=1.24.0 +pillow>=10.0.0 +pyautogui>=0.9.54 +keyboard>=0.13.5 +mouse>=0.7.1 +requests>=2.31.0 +loguru>=0.7.0 diff --git a/umi_ocr_client.py b/umi_ocr_client.py new file mode 100644 index 0000000..3c215c9 --- /dev/null +++ b/umi_ocr_client.py @@ -0,0 +1,228 @@ +""" +Umi-OCR HTTP客户端 +用于调用Umi-OCR的argv接口进行OCR识别 + +Umi-OCR 接口文档: +- 服务地址: http://127.0.0.1:1224 +- argv接口: POST /argv +- 请求格式: JSON数组,如 ["--screenshot"] 或 ["--path", "图片路径"] +- 返回格式: 纯文本字符串 +""" + +import time +import requests +from typing import List, Optional, Union +from pathlib import Path +from loguru import logger + + +class UmiOCRClient: + """Umi-OCR HTTP客户端""" + + DEFAULT_HOST = "127.0.0.1" + DEFAULT_PORT = 1224 + + def __init__(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT): + self.host = host + self.port = port + self.base_url = f"http://{host}:{port}" + self.argv_url = f"{self.base_url}/argv" + + def is_service_running(self, timeout: float = 2.0) -> bool: + """ + 检查Umi-OCR HTTP服务是否运行 + + Args: + timeout: 请求超时时间(秒) + + Returns: + 服务是否可用 + """ + try: + response = requests.get( + self.base_url, + timeout=timeout + ) + return response.status_code == 200 + except requests.exceptions.ConnectionError: + logger.warning(f"无法连接到Umi-OCR服务: {self.base_url}") + return False + except requests.exceptions.Timeout: + logger.warning(f"连接Umi-OCR服务超时: {self.base_url}") + return False + except Exception as e: + logger.error(f"检查Umi-OCR服务状态时出错: {e}") + return False + + def recognize_screenshot(self, timeout: float = 30.0) -> Optional[str]: + """ + 调用Umi-OCR进行截图识别 + 等价于命令行: Umi-OCR --screenshot + + Args: + timeout: 请求超时时间(秒) + + Returns: + 识别到的文字,失败返回None + """ + if not self.is_service_running(): + logger.error("Umi-OCR服务未运行,请先启动Umi-OCR") + return None + + try: + data = ["--screenshot"] + response = requests.post( + self.argv_url, + headers={"Content-Type": "application/json"}, + json=data, + timeout=timeout + ) + response.raise_for_status() + + text = response.text + logger.info(f"截图OCR完成,识别到 {len(text)} 个字符") + return text + + except requests.exceptions.Timeout: + logger.error("Umi-OCR请求超时") + return None + except Exception as e: + logger.error(f"Umi-OCR截图识别失败: {e}") + return None + + def recognize_image(self, image_path: Union[str, Path], timeout: float = 30.0) -> Optional[str]: + """ + 调用Umi-OCR识别指定图片 + 等价于命令行: Umi-OCR --path "图片路径" + + Args: + image_path: 图片文件路径 + timeout: 请求超时时间(秒) + + Returns: + 识别到的文字,失败返回None + """ + if not self.is_service_running(): + logger.error("Umi-OCR服务未运行,请先启动Umi-OCR") + return None + + image_path = Path(image_path) + if not image_path.exists(): + logger.error(f"图片文件不存在: {image_path}") + return None + + try: + # 转换为绝对路径并标准化 + abs_path = str(image_path.resolve()) + data = ["--path", abs_path] + + response = requests.post( + self.argv_url, + headers={"Content-Type": "application/json"}, + json=data, + timeout=timeout + ) + response.raise_for_status() + + text = response.text + logger.info(f"图片OCR完成: {image_path.name}, 识别到 {len(text)} 个字符") + return text + + except requests.exceptions.Timeout: + logger.error("Umi-OCR请求超时") + return None + except Exception as e: + logger.error(f"Umi-OCR图片识别失败: {e}") + return None + + def recognize_images(self, image_paths: List[Union[str, Path]], timeout: float = 30.0) -> List[str]: + """ + 批量识别多张图片 + + Args: + image_paths: 图片路径列表 + timeout: 每张图片的请求超时时间(秒) + + Returns: + 识别结果列表,失败的图片对应位置为None + """ + results = [] + for path in image_paths: + result = self.recognize_image(path, timeout) + results.append(result) + # 添加小延迟避免请求过快 + time.sleep(0.1) + return results + + +def check_and_wait_for_service(client: UmiOCRClient, max_wait: float = 10.0, interval: float = 1.0) -> bool: + """ + 检查并等待Umi-OCR服务启动 + + Args: + client: UmiOCRClient实例 + max_wait: 最大等待时间(秒) + interval: 检查间隔(秒) + + Returns: + 服务是否可用 + """ + start_time = time.time() + while time.time() - start_time < max_wait: + if client.is_service_running(): + logger.info("Umi-OCR服务已就绪") + return True + logger.info("等待Umi-OCR服务启动...") + time.sleep(interval) + + logger.error(f"等待Umi-OCR服务超时({max_wait}秒)") + return False + + +# 便捷函数 +def recognize_screenshot(host: str = UmiOCRClient.DEFAULT_HOST, + port: int = UmiOCRClient.DEFAULT_PORT) -> Optional[str]: + """便捷函数:截图识别""" + client = UmiOCRClient(host, port) + return client.recognize_screenshot() + + +def recognize_image(image_path: Union[str, Path], + host: str = UmiOCRClient.DEFAULT_HOST, + port: int = UmiOCRClient.DEFAULT_PORT) -> Optional[str]: + """便捷函数:图片识别""" + client = UmiOCRClient(host, port) + return client.recognize_image(image_path) + + +if __name__ == "__main__": + # 测试代码 + print("=" * 60) + print("Umi-OCR 客户端测试") + print("=" * 60) + + client = UmiOCRClient() + + # 检查服务状态 + print("\n1. 检查服务状态...") + if client.is_service_running(): + print("✓ Umi-OCR服务运行中") + else: + print("✗ Umi-OCR服务未运行") + print("请先启动Umi-OCR软件并开启HTTP服务(设置->HTTP接口->启用)") + exit(1) + + # 测试截图识别 + print("\n2. 测试截图识别...") + print("请在5秒内准备好要截图的内容...") + time.sleep(5) + + result = client.recognize_screenshot() + if result: + print(f"✓ 识别成功,内容:\n{result[:200]}...") + else: + print("✗ 识别失败") + + print("\n" + "=" * 60) + print("测试完成") + print("=" * 60)