docs: 添加涉密文件自检工具实施计划

This commit is contained in:
2026-06-08 13:53:24 +08:00
commit 31161d9a5f
1838 changed files with 455407 additions and 0 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,183 @@
# ============================================
# =============== 命令行-客户端 ===============
# ============================================
import os
import sys
import time
import psutil
from umi_log import logger
from ..utils import pre_configs
from ..platform import Platform
# 获取进程的创建时间
def getPidTime(pid):
try:
return str(psutil.Process(pid).create_time())
except psutil.NoSuchProcess:
logger.warning(
"psutil.pid_exists(pid) 存在,但 Process 无法生成对象",
exc_info=True,
stack_info=True,
)
return ""
except Exception:
logger.error("psutil.Process(pid) error", exc_info=True, stack_info=True)
return ""
# 检查软件多开
def _isMultiOpen():
# 检查上次记录的pid和key是否还在运行
recordPID = pre_configs.getValue("last_pid")
recordPTime = pre_configs.getValue("last_ptime")
if psutil.pid_exists(recordPID): # 上次记录的pid如今存在
processTime = getPidTime(recordPID)
if recordPTime == processTime: # 当前该进程启动时间与记录的相同,则为多开
return True
return False
# 输出
def _output(argv, argument, mode, text):
if argument not in argv:
return
path = ""
# 提取路径参数
try:
i = argv.index(argument)
path = argv[i + 1]
del argv[i : i + 2]
except Exception as e:
# logger 输出到 stderr print 输出到 stdout
logger.error(f"argument {argument} cannot be resolved.", exc_info=True)
print(f"[Error] argument {argument} cannot be resolved. \n{e}")
return
# 相对路径转绝对路径
if not os.path.isabs(path):
# 获取当前工作目录的上一级目录
current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, os.pardir))
# 将 path 转为绝对路径,且以上一级目录为基准
path = os.path.abspath(os.path.join(parent_dir, path))
try:
with open(path, mode, encoding="utf-8") as f:
f.write(text)
print(f"\nSuccess output to file: {path}")
except Exception as e:
logger.error(f"failed to write file {path} .", exc_info=True)
print(f"[Error] failed to write file {path} : \n{e}")
return
# 复制文本到剪贴板,不依赖第三方库
def _clip(text):
import subprocess
import platform
import tempfile
os_type = platform.system()
try:
if os_type == "Windows":
# 创建一个临时文件,并立即关闭它,以便其他进程可以访问
with tempfile.NamedTemporaryFile(
delete=False, mode="w+", newline="\n"
) as temp_file:
temp_file.write(text)
temp_file_name = temp_file.name
temp_file.close()
try:
subprocess.run(f"clip < {temp_file_name}", check=True, shell=True)
finally:
os.unlink(temp_file_name) # 删除临时文件
print("\nSuccess copy to clipboard.")
else:
print(f"[Error] clip unsupported OS: {os_type}")
except Exception as e:
logger.error("failed to copy to clipboard.", exc_info=True)
print(f"[Error] failed to copy to clipboard: {e}")
# 跨进程发送指令
def _sendCmd(argv):
port = pre_configs.getValue("server_port") # 记录的端口号
url = f"http://127.0.0.1:{port}/argv" # argv指令列表接口
errStr = f"Umi-OCR 已在运行HTTP跨进程传输指令失败。\n[Error] Umi-OCR is already running, HTTP cross process transmission instruction failed.\n{url}"
import urllib.request
import json
# 向后台工作进程发送指令
res = ""
try:
data = json.dumps(argv, ensure_ascii=True).encode("utf-8")
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"}
)
# response = urllib.request.urlopen(req)
# 创建一个不使用代理的 opener ,发送请求
opener = urllib.request.build_opener(urllib.request.ProxyHandler({}))
response = opener.open(req)
if response.status == 200:
res = response.read().decode("utf-8")
else:
res = f"{errStr}\nstatus_code: {response.status}"
except Exception as e:
res = f"{errStr}\nerror: {e}"
# 输出
print(res)
_output(argv, "-->", "w", res)
_output(argv, "--output", "w", res)
_output(argv, "-->>", "a", res)
_output(argv, "--output_append", "a", res)
if "--clip" in argv:
_clip(res)
# 启动新进程,并发送指令
def _newSend(argv):
from umi_about import UmiAbout # 项目信息
appPath = UmiAbout["app"]["path"]
if not appPath:
msg = "未找到 Umi-OCR 入口路径,无法启动新进程。请手动启动 Umi-OCR 后发送指令。\nUmi-OCR path not found, unable to start a new process."
os.MessageBox(msg)
return
# 启动进程,传入强制参数,避免递归无限启动进程
Platform.runNewProcess(appPath, " --force")
# 等待并检查 服务进程初始化完毕
for i in range(60): # 检测轮次
time.sleep(0.5) # 每次等待时间
pre_configs.readConfigs() # 重新读取预配置
if _isMultiOpen(): # 检测新进程是否启动
_sendCmd(argv) # 发送指令
return
print(
"服务进程初始化失败,等待时间超时。\n[Error] The service process initialization failed and the waiting time timed out."
)
# 初始化命令行
def initCmd():
argv = sys.argv[1:]
force = False
if "--force" in argv:
argv.remove("--force")
force = True
# 检查,发现软件多开,则向已在运行的进程发送初始指令
if _isMultiOpen():
_sendCmd(argv)
return False
# 未多开,则启动进程
else:
# 无参数或强制启动则正常运行本进程刷新pid和ptime
if not argv or force:
nowPid = os.getpid()
nowPTime = getPidTime(nowPid)
pre_configs.setValue("last_pid", nowPid)
pre_configs.setValue("last_ptime", nowPTime)
return True
else: # 有参数,则启动新进程并发送参数
_newSend(argv)
return False

View File

@@ -0,0 +1,524 @@
# ===============================================
# =============== 命令行-解析和执行 ===============
# ===============================================
import time
import json
import argparse
from threading import Condition
from ..utils.call_func import CallFunc
from ..utils.file_finder import findFiles
from ..event_bus.pubsub_service import PubSubService # 发布/订阅管理器
# 命令执行器
class _Actuator:
def __init__(self):
self.pyDict = {} # python模块字典
self.qmlDict = {} # qml模块字典
self.tagPageConn = None # 页面连接器的引用
# 初始化并收集信息。传入qml模块字典
def initCollect(self, qmlModuleDict):
qmlModuleDict = qmlModuleDict.toVariant()
self.qmlDict.update(qmlModuleDict)
# 获取页面连接器实例
from ..tag_pages.tag_pages_connector import TagPageConnObj
self.tagPageConn = TagPageConnObj
# ============================== 页面管理 ==============================
# 返回当前 [可创建的页面模板] 和 [已创建的页面] 的信息
def getAllPages(self):
TabViewManager = self.qmlDict["TabViewManager"]
pageList = TabViewManager.getPageList().toVariant()
infoStr = "All opened pages:\npage_index\tkey\ttitle\n"
for index, value in enumerate(pageList):
infoStr += f'{index}\t{value["ctrlKey"]}\t{value["info"]["title"]}\n'
infoList = TabViewManager.getInfoList().toVariant()
infoStr += (
"\nAll page templates that can be opened:\ntemplate_index\tkey\ttitle\n"
)
for index, value in enumerate(infoList):
infoStr += f'{index}\t{value["key"]}\t{value["title"]}\n'
infoStr += "\nUsage of create a page:\n"
infoStr += " Umi-OCR --add_page [template_index]\n"
infoStr += "Usage of delete a page:\n"
infoStr += " Umi-OCR --del_page [page_index]\n"
infoStr += "Usage of query the modules that can be called:\n"
infoStr += " Umi-OCR --all_modules\n"
return infoStr
# 创建页面
def addPage(self, index):
try:
index = int(index)
except ValueError:
return f"[Error] template_index must be integer, not {index}."
TabViewManager = self.qmlDict["TabViewManager"]
infoList = TabViewManager.getInfoList().toVariant()
l = len(infoList) - 1
if index < 0 or index > l:
return f"[Error] template_index {index} out of range (0~{l})."
return self.call("TabViewManager", "qml", "addTabPage", False, -1, index)
# 删除页面
def delPage(self, index):
try:
index = int(index)
except ValueError:
return f"[Error] page_index must be integer, not {index}."
TabViewManager = self.qmlDict["TabViewManager"]
pageList = TabViewManager.getPageList().toVariant()
l = len(pageList) - 1
if index < 0 or index > l:
return f"[Error] page_index {index} out of range (0~{l})."
return self.call("TabViewManager", "qml", "delTabPage", False, index)
# 通过key创建页面
def addPageByKey(self, key):
# 1. 检查截图标签页,如果未创建则创建
module, _ = self.getModuleFromName(key, "qml")
if module == None:
tvm = self.qmlDict["TabViewManager"]
infoList = tvm.getInfoList().toVariant()
f2 = False
for i, v in enumerate(infoList):
if v["key"] == key:
f2 = True
self.addPage(i)
break
if not f2:
return f"[Error] Template {key} not found."
for i in range(10):
time.sleep(0.5)
module, _ = self.getModuleFromName(key, "qml")
if module != None:
break
if module == None:
return f"[Error] Unable to create template {key}."
return "[Success]"
# ============================== 动态模块调用 ==============================
# 返回所有可调用模块
def getModules(self):
pyd, qmld = {}, {}
pages = self.tagPageConn.pages
for p in pages:
if pages[p]["qmlObj"]:
qmld[p] = pages[p]["qmlObj"]
if pages[p]["pyObj"]:
pyd[p] = pages[p]["pyObj"]
pyd.update(self.pyDict)
qmld.update(self.qmlDict)
return {"py": pyd, "qml": qmld}
# 传入(不完整的)模块名搜索并返回模块实例。type: py / qml
def getModuleFromName(self, moduleName, type_):
d = self.getModules()[type_]
module = None
if moduleName in d:
module = d[moduleName]
else:
for name in d.keys(): # 若输入模块名的前几个字母,也可以匹配
if name.startswith(moduleName):
moduleName = name
module = d[name]
break
return module, moduleName
# 返回所有可调用模块的帮助信息
def getModulesHelp(self):
modules = self.getModules()
help = "\nPython modules: (Usage: Umi-OCR --call_py [module name])\n"
for k in modules["py"].keys():
help += f" {k}\n"
help += "\nQml modules: (Usage: Umi-OCR --call_qml [module name])\n"
for k in modules["qml"].keys():
help += f" {k}\n"
help += f"\nTips: module name can only write the first letters, such as [ScreenshotOCR_1] → [Scr]"
return help
# 返回一个模块的所有函数的帮助信息
def getModuleFuncsHelp(self, moduleName, type_):
module, moduleName = self.getModuleFromName(moduleName, type_)
typeStr = "Python" if type_ == "py" else "qml"
if not module:
return f'[Error] {typeStr} module "{moduleName}" non-existent.'
funcs = [
func
for func in vars(type(module)).keys()
if callable(getattr(module, func))
]
help = f'All functions in {typeStr} module "{moduleName}":\n'
for f in funcs:
f = str(f)
if not f.startswith("_"):
help += f" {f}\n"
help += f"Usage: Umi-OCR --call_qml {moduleName} --func [function name]\n"
return help
# 调用一个模块函数。type: py / qml , thread: True 同步在子线程 / False 异步在主线程
def call(self, moduleName, type_, funcName, thread, *paras):
module, moduleName = self.getModuleFromName(moduleName, type_)
typeStr = "Python" if type_ == "py" else "qml"
if not module:
return f'[Error] {typeStr} module "{moduleName}" non-existent.'
func = getattr(module, funcName, None)
if not func:
return f'[Error] func "{funcName}" not exist in {typeStr} module "{moduleName}".'
try:
if thread: # 在子线程执行,返回结果
return func(*paras)
else: # 在主线程执行,返回标志文本
CallFunc.now(func, *paras) # 在主线程中调用回调函数
return f'Calling "{funcName}" in main thread.'
except Exception as e:
return f'[Error] calling {typeStr} module "{moduleName}" - "{funcName}" {paras}: {e}'
# ============================== 便捷指令 ==============================
# 控制主窗口
def ctrlWindow(self, show, hide, quit):
if show:
self.call("MainWindow", "qml", "setVisibility", False, True)
return "Umi-OCR show."
elif hide:
self.call("MainWindow", "qml", "setVisibility", False, False)
return "Umi-OCR hide."
elif quit:
self.call("MainWindow", "qml", "quit", False)
return "Umi-OCR quit."
# 快捷OCR截图/粘贴/路径,并获取返回结果
def quick_ocr(self, ss, clip, paras):
# 1. 检查截图标签页,如果未创建则创建
msg = self.addPageByKey("ScreenshotOCR")
if msg != "[Success]":
return msg
# 2. 订阅事件,监听 <<ScreenshotOcrEnd>>
isOcrEnd = False
resList = []
condition = Condition() # 线程同步器
def onOcrEnd(recentResult):
nonlocal isOcrEnd, resList
isOcrEnd = True
resList = recentResult
with condition: # 释放线程阻塞
condition.notify()
PubSubService.subscribe("<<ScreenshotOcrEnd>>", onOcrEnd)
# 3. 调用截图标签页的函数
if ss: # 截图
if not paras: # 无参数,手动截图
self.call("ScreenshotOCR", "qml", "screenshot", False)
else: # 有参数,自动截图 umi-ocr --screenshot screen=0 rect=0,100,500,200
rect = [0, 0, 0, 0] # 截图矩形框
screen = 0 # 显示器编号
para_args = []
try:
for para in paras: # 空格分隔
para_args.extend(para.split())
for part in para_args:
if part.startswith("rect="):
rect_values = part[len("rect=") :].split(",")
rect_values = [int(v) for v in rect_values]
rect[: len(rect_values)] = rect_values # 补齐rect的值
elif part.startswith("screen="):
screen = int(part[len("screen=") :])
self.call(
"ScreenshotOCR", "qml", "autoScreenshot", False, rect, screen
)
except Exception as e:
return f"[Error] {e}"
elif clip: # 粘贴
self.call("ScreenshotOCR", "qml", "paste", False)
else: # 路径
if not paras:
return "[Error] Paths is empty."
paths = findFiles(paras, "image", True) # 递归搜索
if not paths:
return "[Error] No valid path."
self.call("ScreenshotOCR", "qml", "ocrPaths", False, paths)
# 4. 堵塞等待任务完成,注销事件订阅
with condition:
while not isOcrEnd:
condition.wait()
PubSubService.unsubscribe("<<ScreenshotOcrEnd>>", onOcrEnd)
# 5. 处理结果列表,转文本
text = ""
for i, r in enumerate(resList): # 遍历图片
if text and not text.endswith("\n"): # 如果上次结果结尾没有换行,则补换行
text += "\n"
if r["code"] == 100:
for d in r["data"]: # 遍历文本块
text += d["text"] + d["end"]
elif r["code"] != 101 and isinstance(r["data"], str):
text += r["data"]
if not text:
text = "[Message] No text in OCR result."
return text
# 创建二维码
def qrcode_create(self, paras):
if len(paras) < 2:
return (
'[Error] Not enough arguments passed! Must pass "text" "save_image.jpg"'
)
text, path = paras[0], paras[1]
if len(paras) == 3:
w = h = int(paras[2])
elif len(paras) == 4:
w, h = int(paras[2]), int(paras[3])
else:
w = h = 0
try:
from ..mission.mission_qrcode import MissionQRCode
pil = MissionQRCode.createImage(
text,
format="QRCode", # 格式
w=w, # 宽高
h=h,
quiet_zone=-1, # 边缘宽度
ec_level=-1, # 纠错等级
)
if isinstance(pil, str):
return pil
pil.save(path)
return f"Successfully saved to {path}"
except Exception as e:
return f"[Error] {str(e)}"
# 识别二维码
def qrcode_read(self, paras):
if len(paras) < 1:
return '[Error] Not enough arguments passed! Must pass "image_to_recognize.jpg"'
try:
from ..mission.mission_qrcode import MissionQRCode
from PIL import Image
except Exception as e:
return f"[Error] {str(e)}"
resText = ""
paths = findFiles(paras, "image", True) # 递归搜索图片
for index, path in enumerate(paths):
if index != 0:
resText += "\n"
try:
pil = Image.open(path)
res = MissionQRCode.addMissionWait({}, [{"pil": pil}])
res = res[0]["result"]
if res["code"] == 100:
t = ""
for i, d in enumerate(res["data"]):
if i != 0:
t += "\n"
t += d["text"]
resText += t
elif res["code"] == 101:
resText += "No code in image."
else:
resText += f"[Error] Code: {res['code']}\nMessage: {res['data']}"
except Exception as e:
resText += f"[Error] {str(e)}"
return resText
CmdActuator = _Actuator()
# 命令解析器
class _Cmd:
def __init__(self):
self._parser = None
def init(self):
if self._parser:
return
self._parser = argparse.ArgumentParser(prog="Umi-OCR")
# 便捷指令
self._parser.add_argument(
"--show", action="store_true", help="Make the app appear in the foreground."
)
self._parser.add_argument(
"--hide", action="store_true", help="Hide app in the background."
)
self._parser.add_argument("--quit", action="store_true", help="Quit app.")
self._parser.add_argument(
"--screenshot",
action="store_true",
help="Screenshot OCR and output the result.",
)
self._parser.add_argument(
"--clipboard",
action="store_true",
help="Clipboard OCR and output the result.",
)
self._parser.add_argument(
"--path",
action="store_true",
help="OCR the image in path and output the result.",
)
self._parser.add_argument(
"--qrcode_create",
action="store_true",
help='Create a QR code from the text. Use --qrcode_create "text" "save_image.jpg"',
)
self._parser.add_argument(
"--qrcode_read",
action="store_true",
help='Read the QR code. Use --qrcode_read "image_to_recognize.jpg"',
)
self._parser.add_argument(
"--reload",
action="store_true",
help='Reload settings from the configuration file ".settings"',
)
# 页面管理
self._parser.add_argument(
"--all_pages",
action="store_true",
help="Output all template and page information.",
)
self._parser.add_argument(
"--add_page", type=int, help="usage: Umi-OCR --all_pages"
)
self._parser.add_argument(
"--del_page", type=int, help="usage: Umi-OCR --all_pages"
)
# 函数调用
self._parser.add_argument(
"--all_modules",
action="store_true",
help="Output all module names that can be called.",
)
self._parser.add_argument(
"--call_py", help="Calling a function on a Python module."
)
self._parser.add_argument(
"--call_qml", help="Calling a function on a Qml module."
)
self._parser.add_argument(
"--func", help="The name of the function to be called."
)
self._parser.add_argument(
"--thread",
action="store_true",
help="The function will be called on the child thread and return the result, but it may be unstable or cause QML to crash.",
)
# 输出
self._parser.add_argument(
"--clip",
action="store_true",
help="Copy the results to the clipboard.",
)
self._parser.add_argument(
"--output",
help="The path to the file where results will be saved. (overwrite)",
)
self._parser.add_argument(
"--output_append",
help="The path to the file where results will be saved. (append)",
)
self._parser.add_argument("-->", help='"-->" equivalent to "--output"')
self._parser.add_argument("-->>", help='"-->>" equivalent to "--output_append"')
self._parser.add_argument("paras", nargs="*", help="parameters of [--func].")
# 分析指令,返回指令对象或报错字符串
def parse(self, argv):
self.init()
# 特殊情况
if "-h" in argv or "--help" in argv: # 帮助
return self._parser.format_help()
if len(argv) == 0: # 空指令
CmdActuator.ctrlWindow(True, False, False) # 展示主窗
return self._parser.format_help() # 返回帮助
# 正常解析
try:
return self._parser.parse_args(argv)
except SystemExit as e:
return f"Your argv: {argv}\n[Error]: {e}\nusage: Umi-OCR --help"
except Exception as e:
return f"Your argv: {argv}\n[Error]: {e}\nusage: Umi-OCR --help"
# 执行指令,返回执行结果字符串
def execute(self, argv):
args = self.parse(argv)
if isinstance(args, str):
return args
if args.all_modules:
return CmdActuator.getModulesHelp()
# 便捷指令
if args.show or args.hide or args.quit: # 控制主窗
return CmdActuator.ctrlWindow(args.show, args.hide, args.quit)
if args.screenshot or args.clipboard or args.path: # 快捷识图
return CmdActuator.quick_ocr(args.screenshot, args.clipboard, args.paras)
if args.qrcode_create: # 写二维码
return CmdActuator.qrcode_create(args.paras)
if args.qrcode_read: # 读二维码
return CmdActuator.qrcode_read(args.paras)
if args.reload: # 重新加载配置
PubSubService.publish("<<settingsReload>>")
return "Settings reload."
# 页面管理
if args.all_pages:
return CmdActuator.getAllPages()
if not args.add_page is None:
return CmdActuator.addPage(args.add_page)
if not args.del_page is None:
return CmdActuator.delPage(args.del_page)
# 动态模块调用
if args.call_py:
if args.func:
return CmdActuator.call(
args.call_py,
"py",
args.func,
args.thread,
*self.format_paras(args.paras),
)
else:
return CmdActuator.getModuleFuncsHelp(args.call_py, "py")
if args.call_qml:
if args.func:
return CmdActuator.call(
args.call_qml,
"qml",
args.func,
args.thread,
*self.format_paras(args.paras),
)
else:
return CmdActuator.getModuleFuncsHelp(args.call_qml, "qml")
# paras 格式化
def format_paras(self, paras):
def convert_param(param):
try:
return int(param)
except ValueError:
pass
try:
return float(param)
except ValueError:
pass
try:
return json.loads(param)
except json.JSONDecodeError:
pass
return param
return [convert_param(p) for p in paras]
CmdServer = _Cmd()

View File

@@ -0,0 +1,567 @@
import os
import re
import json
import time
import shutil
import zipfile
from urllib.parse import urlparse
from uuid import uuid4
from PySide2.QtCore import QMutex
from typing import Dict
from umi_log import logger
from call_func import CallFunc
from .bottle import request, static_file, HTTPError
from .ocr_server import get_ocr_options
from ..ocr.output import Output
from ..mission.mission_doc import MissionDOC, DocSuf
from ..utils.utils import initConfigDict
from ..ocr.output.tools import getDataText
UPLOAD_DIR = "./temp_doc" # 上传文件临时目录
TEMP_FILE_RETENTION_DURATION = 24 # 任务临时文件保留时长,小时
TEMP_FILE_CLEANUP_INTERVAL = 0.5 # 自动清理临时文件的间隔,小时
# 获取参数模板字典
def get_doc_options():
opts = get_ocr_options(is_format=False)
opts["tbpu.ignoreRangeStart"] = {
"title": "忽略区域起始",
"toolTip": "忽略区域生效的页数范围起始。从1开始。",
"default": 1,
"isInt": True,
}
opts["tbpu.ignoreRangeEnd"] = {
"title": "忽略区域结束",
"toolTip": "忽略区域生效的页数范围结束。可以用负数表示倒数第X页。",
"default": -1,
"isInt": True,
}
opts["pageRangeStart"] = {
"title": "OCR页数起始",
"toolTip": "OCR的页数范围起始。从1开始。",
"default": 1,
"isInt": True,
}
opts["pageRangeEnd"] = {
"title": "OCR页数结束",
"toolTip": "OCR的页数范围结束。可以用负数表示倒数第X页。",
"default": -1,
"isInt": True,
}
opts["pageList"] = {
"title": "OCR页数列表",
"toolTip": "数组,可指定单个或多个页数。例:[1,2,5]表示对第1、2、5页进行OCR。如果与页数范围同时填写则 pageList 优先。",
"default": [],
"type": "var",
}
opts["password"] = {
"title": "密码",
"toolTip": "如果文档已加密,则填写文档密码。",
"default": "",
}
opts["doc.extractionMode"] = {
"title": "内容提取模式",
"toolTip": "若一页文档既存在图片又存在文本,如何进行处理。",
"optionsList": [
["mixed", "混合OCR/原文本"],
["fullPage", "整页强制OCR"],
["imageOnly", "仅OCR图片"],
["textOnly", "仅拷贝原有文本"],
],
}
opts = initConfigDict(opts) # 格式化
return opts
UPLOAD_DIR = os.path.abspath(UPLOAD_DIR) # 路径转绝对
TEMP_FILE_RETENTION_DURATION *= 3600 # 小时转为秒
TEMP_FILE_CLEANUP_INTERVAL *= 3600
# 异常类
class DocUnitError(Exception):
def __init__(self, data):
self.data = data
# 单个任务单元
class _DocUnit:
def __init__(
self, dir_id, dir_path, origin_path, origin_name, origin_prefix, options
):
# 提取文档信息
doc_info = MissionDOC.getDocInfo(origin_path)
if "error" in doc_info.keys():
raise DocUnitError({"code": 201, "data": doc_info["error"]})
# 补充缺失的默认参数
default = get_doc_options()
for key in default:
if key not in options:
options[key] = default[key]["default"]
# 提取参数
page_range = [options["pageRangeStart"], options["pageRangeEnd"]] # 识别范围
page_list = options["pageList"] # 页数列表
if page_list: # 下标起始由1转为0
page_list = [x - 1 for x in page_list]
password = options["password"] # 密码
if not password and doc_info["is_encrypted"]:
raise DocUnitError(
{
"code": 202,
"data": "The doc is encrypted, please fill in the password.",
}
)
# 从 options 中提取一些条目,组装 docArgd 作为 MissionDoc 任务参数字典
prefixes = ["ocr.", "doc.", "tbpu."] # 要提取的条目前缀
doc_argd = {}
for k, v in options.items():
for prefix in prefixes:
if k.startswith(prefix):
doc_argd[k] = v
break
# 任务信息
msnInfo = {
"onStart": self._onStart,
"onGet": self._onGet,
"onEnd": self._onEnd,
"argd": doc_argd,
}
# 提交任务
self.msnID = ""
msg = MissionDOC.addMission(
msnInfo, origin_path, page_range, page_list, password
)
if not msg:
raise DocUnitError({"code": 203, "data": "addMission unknow."})
if msg.startswith("["):
raise DocUnitError({"code": 204, "data": msg})
page_list = msnInfo["pageList"]
self.password = password
self.dir_id = dir_id
self.dir_path = dir_path
self.origin_prefix = origin_prefix
self.origin_name = origin_name
self.origin_path = origin_path
self.msnID = msg # 任务ID
self.results = {} # 任务结果原始字典,键为页数
self.pages_count = len(page_list) # 任务总页数
self.processed_count = 0 # 已处理的页数
self.unread_list = [] # 未读的任务列表
self.is_done = False # 当前任务是否完成
self.state = "waiting" # 任务状态, waiting running success failure
self.message = "" # 如果任务失败,则记录失败信息
self.start_timestamp = time.time() # 开始时间戳
self.end_timestamp = time.time() # 任务结束的时间戳
self._mutex = QMutex() # 主锁
# ========================= 【接口】 =========================
# 获取结果
def get_result(
self,
is_data=False, # True 时返回识别内容data
format="dict", # 识别内容格式, "dict", "text"
is_unread=False, # True 时只返回未读过的识别内容
):
self._mutex.lock()
data = {
"code": 100,
"processed_count": self.processed_count, # 已处理的数量
"pages_count": self.pages_count, # 总页数
"is_done": self.is_done, # 是否已结束
"state": self.state, # 任务状态
"data": [], # 结果
}
if self.state == "failure":
data["message"] = self.message
# 需要返回识别内容
if is_data:
datas = []
# 增量式
if is_unread:
for page in self.unread_list:
datas.append(self.results[page])
self.unread_list = []
# 全量式
else:
for _, res in self.results.items():
datas.append(res)
# 需要转为纯文本
if format == "text":
datas_text = ""
for res in datas:
if res["code"] == 100:
datas_text += getDataText(res["data"])
datas = datas_text
data["data"] = datas
self._mutex.unlock()
return data
# 获取文件
def get_files(
self,
base_url, # 下载基础url
file_types=["pdfLayered"], # 输出文件类型,可选:
# txt, txtPlain, jsonl, csv, pdfLayered, pdfOneLayer
ignore_blank=True, # 忽略空白页数
):
if not self.is_done:
return {"code": 201, "data": f"{self.msnID} 任务尚未结束,无法获取文件"}
if not self.state == "success":
return {"code": 201, "data": f"{self.msnID} 任务处理失败,无法获取文件"}
if not isinstance(file_types, list) or not isinstance(ignore_blank, bool):
return {
"code": 202,
"data": f"参数类型错误: file_types={file_types} , ignore_blank={ignore_blank}",
}
# 删除旧的文件
for filename in os.listdir(self.dir_path):
file_path = os.path.join(self.dir_path, filename)
if filename != self.origin_name and os.path.isfile(file_path):
os.remove(file_path)
# 准备参数
startDatetime = time.strftime( # 日期时间字符串(标准格式)
r"%Y-%m-%d %H:%M:%S", time.localtime(self.start_timestamp)
)
outputArgd = {
"outputDir": self.dir_path, # 输出路径
"outputDirType": "specify",
"outputFileName": "[OCR]_" + self.origin_prefix, # 输出文件名(前缀)
"startDatetime": startDatetime, # 开始日期
"ignoreBlank": ignore_blank, # 忽略空白页数
"originPath": self.origin_path, # 原始文件
"password": self.password, # 文档密码
}
# 创建输出器
output = []
try:
for f in file_types:
output.append(Output[f](outputArgd))
except Exception as e:
return {"code": 203, "data": f"初始化输出器失败。{e}"}
# 输出
for o in output:
for _, res in self.results.items():
try:
o.print(res)
except Exception as e:
return {"code": 204, "data": f"输出失败:{o}\n{e}"}
try:
o.onEnd() # 保存
except Exception as e:
return {"code": 205, "data": f"保存失败:{o}\n{e}"}
# 收集新的文件
download_paths = []
for filename in os.listdir(self.dir_path):
file_path = os.path.join(self.dir_path, filename)
if filename != self.origin_name and os.path.isfile(file_path):
download_paths.append(file_path)
# 如果文件多则打包zip
if not download_paths:
return {"code": 206, "data": "未找到生成的文件"}
elif len(download_paths) == 1:
download_name = os.path.basename(download_paths[0])
else:
download_name = f"[OCR]_{self.origin_prefix}.zip"
zip_path = os.path.join(self.dir_path, download_name)
# 将 download_list 中的所有文件打包为 zip
try:
with zipfile.ZipFile(zip_path, "w") as zipf:
for p in download_paths:
zipf.write(p, os.path.basename(p))
except Exception as e:
return {"code": 207, "data": f"无法打包zip{e}"}
# 组合下载地址
url = f"{base_url}/api/doc/download/{self.dir_id}/{download_name}"
return {"code": 100, "data": url, "name": download_name}
# 清理任务
def clear(self):
# 停止任务
if not self.is_done:
MissionDOC.stopMissionList([self.msnID])
time.sleep(0.1) # 给一些时间收尾
# 尝试删除目录。如果权限原因无法删除说明OCR线程还在占用等待OCR结束。
for i in range(20):
try:
if os.path.exists(self.dir_path):
shutil.rmtree(self.dir_path)
except PermissionError:
time.sleep(1)
# ========================= 【任务控制器的异步回调】 =========================
def _onStart(self, msnInfo): # 一个文档 开始
self.state = "running"
def _onGet(self, msnInfo, page, res): # 一个文档的一页 获取结果
page += 1
res["page"] = page
res["path"] = f"{self.origin_name} - {page}"
res["fileName"] = f"{self.origin_name} - {page}"
# 记录信息
self._mutex.lock()
self.results[page] = res
self.processed_count += 1
self.unread_list.append(page)
self._mutex.unlock()
def _onEnd(self, msnInfo, msg): # 一个文档处理完毕
# msg: [Success] [Warning] [Error]
# 记录信息
self._mutex.lock()
self.is_done = True
if msg == "[Success]":
self.state = "success"
else:
self.state = "failure"
self.message = msg
self.end_timestamp = time.time() # 刷新结束时间戳
self._mutex.unlock()
# 管理所有任务单元
class _DocUnitManagerClass:
def __init__(self):
self.doc_units: Dict[str, _DocUnit] = {}
# 添加一个任务单元
def add(self, id: str, unit: _DocUnit):
self.doc_units[id] = unit
# 获取一个任务单元
def get(self, id: str):
if id not in self.doc_units:
return None
return self.doc_units[id]
# 手动清理一个任务
def clear(self, id: str):
if id in self.doc_units:
self.doc_units[id].clear()
del self.doc_units[id]
return True
return False
# 自动清理
def auto_clear(self):
# 清理超时的任务和文件
if self.doc_units:
now = time.time() # 当前时间戳
del_list = [] # 要清理的id
for id, unit in self.doc_units.items():
if now - unit.end_timestamp > TEMP_FILE_RETENTION_DURATION:
logger.info(f"超时自动清理 {id}")
unit.clear() # 清理文件
del_list.append(id)
for id in del_list:
del self.doc_units[id] # 清理任务对象
# 计划下一次清理
CallFunc.delay(self.auto_clear, TEMP_FILE_CLEANUP_INTERVAL)
_DocUnitManager = _DocUnitManagerClass()
# 路由函数
def init(UmiWeb):
# 清空上传文件目录内容
if os.path.exists(UPLOAD_DIR):
shutil.rmtree(UPLOAD_DIR)
os.makedirs(UPLOAD_DIR)
# 启动自动清理循环
_DocUnitManager.auto_clear()
@UmiWeb.route("/api/doc/get_options")
def _get_options_json():
opts = get_doc_options()
res = json.dumps(opts)
return res
"""
上传文档方法POST
参数:文档内容
返回值:
成功: {"code": 100, "data": "任务id"}
失败: {"code": 不是100的值, "data": "失败原因"}
"""
@UmiWeb.route("/api/doc/upload", method="POST")
def _upload():
# 1. 获取上传文件
upload = request.files.get("file")
if not upload:
return {"code": 101, "data": "[Error] No file was uploaded."}
# 2. 获取文件名,检查文件后缀
# origin_name = upload.filename
# 将原始文件名转为合法文件名
def filename_convert(raw_filename: str):
# 去除前后的空格
raw_filename = raw_filename.strip()
# 定义非法字符
illegal_chars = r'[\/:*?"<>|]'
# 替换非法字符
sanitized_filename = re.sub(illegal_chars, "_", raw_filename)
# 限制文件名长度为255个字符
max_length = 255
if len(sanitized_filename) > max_length:
sanitized_filename = sanitized_filename[:max_length]
return sanitized_filename
try:
origin_name = filename_convert(upload.raw_filename)
origin_prefix, ext = os.path.splitext(origin_name)
ext = ext.lower()
except Exception as e:
return {"code": 102, "data": f"[Error] Unable to obtain raw_filename: {e}"}
if ext not in DocSuf:
return {
"code": 103,
"data": f"[Error] File extension '{ext}' is not allowed.",
}
# 3. 指定文件编号。创建对应目录,保存文件到 ./temp/dir_id/原文件名
dir_id = str(uuid4())
dir_path = os.path.join(UPLOAD_DIR, f"{dir_id}")
dir_path = os.path.abspath(dir_path) # 将路径转为绝对路径
file_path = os.path.join(dir_path, origin_name)
# 安全检测: file_path 是否在 UPLOAD_DIR 中
if os.path.commonpath([UPLOAD_DIR]) != os.path.commonpath(
[UPLOAD_DIR, file_path]
):
return {"code": 104, "data": f"[Error] Unauthorized path"}
try:
if os.path.exists(dir_path): # 如果目录存在,则删除该目录
shutil.rmtree(dir_path)
os.makedirs(dir_path) # 重新创建目录
except Exception as e:
return {"code": 105, "data": f"[Error] Failed to create dir_id: {e}"}
try:
upload.save(file_path, overwrite=True) # 保存文件
except Exception as e:
return {"code": 106, "data": f"[Error] Failed to save file: {e}"}
# 4. 提取 options 参数
options = request.forms.get("json")
if options:
try:
options = json.loads(options)
except Exception as e:
shutil.rmtree(dir_path)
return {
"code": 107,
"data": f"[Error] Invalid JSON format: {options} | {e}",
}
if not isinstance(options, dict):
options = {}
# 5. 构造任务对象
try:
doc_unit = _DocUnit(
dir_id, dir_path, file_path, origin_name, origin_prefix, options
)
msnID = doc_unit.msnID
_DocUnitManager.add(msnID, doc_unit)
logger.info(f"添加 HTTP 文档任务: {origin_name}")
return {"code": 100, "data": msnID}
except DocUnitError as e:
shutil.rmtree(dir_path)
return e.data
except Exception as e:
shutil.rmtree(dir_path)
return {"code": 108, "data": f"[Error] Failed to submit mission: {e}"}
"""
获取结果方法POST
json参数
"id"="", # 任务ID
"is_data"=False, # True 时返回识别内容data
"format"="dict", # 识别内容格式, "dict", "text"
"is_unread"=False, # True 时只返回未读过的识别内容
返回值: {}
"""
@UmiWeb.route("/api/doc/result", method="POST")
def _result():
try:
user_data = request.json
except Exception as e:
return {"code": 101, "data": f"请求无法解析为json。"}
if not user_data or "id" not in user_data:
return {"code": 102, "data": f"未填写id。"}
msnID = user_data["id"]
doc_unit = _DocUnitManager.get(msnID)
if not doc_unit:
return {"code": 103, "data": f"任务 {msnID} 不存在。"}
is_data = user_data.get("is_data", False)
format = user_data.get("format", "dict")
is_unread = user_data.get("is_unread", True)
return doc_unit.get_result(is_data, format, is_unread)
"""
获取文件方法POST
json参数
"id"="", # 任务ID
"file_types"=["pdfLayered"], # 输出文件类型,可选:
# ["txt", "txtPlain", "jsonl", "csv", "pdfLayered", "pdfOneLayer"]
"ignore_blank"=True, # 忽略空白页数
返回值: {}
"""
@UmiWeb.route("/api/doc/download", method="POST")
def _download_build():
try:
user_data = request.json
except Exception as e:
return {"code": 101, "data": f"请求无法解析为json。"}
if not user_data or "id" not in user_data:
return {"code": 102, "data": f"未填写id。"}
msnID = user_data["id"]
doc_unit = _DocUnitManager.get(msnID)
if not doc_unit:
return {"code": 103, "data": f"任务 {msnID} 不存在。"}
file_types = user_data.get("file_types", ["pdfLayered"])
ignore_blank = user_data.get("ignore_blank", True)
parsed_url = urlparse(request.url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
return doc_unit.get_files(base_url, file_types, ignore_blank)
# 下载文件
@UmiWeb.route("/api/doc/download/<id>/<download_name>")
def _download_get(id, download_name):
dir = os.path.join(UPLOAD_DIR, id)
path = os.path.join(dir, download_name)
# 安全检测: path 是否在 UPLOAD_DIR 中
if os.path.commonpath([UPLOAD_DIR]) != os.path.commonpath([UPLOAD_DIR, path]):
raise HTTPError(103, "[Error] Unauthorized path.")
return static_file(download_name, root=dir)
# 清理任务
@UmiWeb.route("/api/doc/clear/<id>")
def _clear(id):
flag = _DocUnitManager.clear(id)
if flag:
return {"code": 100, "data": "Success"}
return {"code": 101, "data": f"{id} does not exist."}

View File

@@ -0,0 +1,176 @@
import json
from .bottle import request
from ..mission.mission_ocr import MissionOCR
from ..utils.utils import initConfigDict
from ..ocr.output.tools import getDataText
# 获取ocr配置字典。 is_format=False 时不含 format 选项。
def get_ocr_options(is_format=True):
opts = {}
# OCR 的参数
ocr_opts = MissionOCR.getLocalOptions()
for key in ocr_opts:
opts[f"ocr.{key}"] = ocr_opts[key]
# 排版解析的参数
opts["tbpu.parser"] = {
"title": "排版解析方案",
"toolTip": "按什么方式,解析和排序图片中的文字块",
"default": "multi_para",
"optionsList": [
["multi_para", "多栏-按自然段换行"],
["multi_line", "多栏-总是换行"],
["multi_none", "多栏-无换行"],
["single_para", "单栏-按自然段换行"],
["single_line", "单栏-总是换行"],
["single_none", "单栏-无换行"],
["single_code", "单栏-保留缩进"],
["none", "不做处理"],
],
}
# 忽略区域
opts["tbpu.ignoreArea"] = {
"title": "忽略区域",
"toolTip": "数组,每一项为[[左上角x,y],[右下角x,y]]。",
"default": [],
"type": "var",
}
# 输出格式
if is_format:
opts["data.format"] = {
"title": "数据返回格式",
"toolTip": '返回值字典中,["data"] 按什么格式表示OCR结果数据',
"default": "dict",
"optionsList": [
["dict", "含有位置等信息的原始字典"],
["text", "纯文本"],
],
}
opts = initConfigDict(opts) # 格式化
return opts
# 检查ocr参数字典返回修改后字典
def check_ocr_options(opts):
# 检查忽略区域参数
if opts["tbpu.ignoreArea"]:
new_ia = []
ia = opts["tbpu.ignoreArea"]
for a in ia:
if (
not isinstance(a, list)
or len(a) != 2
or not isinstance(a[0], list)
or len(a[0]) != 2
or not isinstance(a[1], list)
or len(a[1]) != 2
or not all(
isinstance(x, (int, float))
for x in [a[0][0], a[0][1], a[1][0], a[1][1]]
)
):
raise Exception(
f"tbpu.ignoreArea 中,每一项的格式必须是 [[x1,y1],[x2,y2]] 。当前值不合法: {ia}"
)
new_ia.append([[a[0][0], a[0][1]], [], [a[1][0], a[1][1]], []])
opts["tbpu.ignoreArea"] = new_ia
return opts
# 路由函数
def init(UmiWeb):
@UmiWeb.route("/api/ocr/get_options")
def _get_options_json():
opts = get_ocr_options()
res = json.dumps(opts)
return res
"""
执行OCR方法POST
参数:
"base64": "", # 必填
"options": {}, # 选填,内容与 _get_options 的对应。
"""
@UmiWeb.route("/api/ocr", method="POST")
def _ocr():
try:
data = request.json
except Exception as e:
return json.dumps({"code": 800, "data": f"请求无法解析为json。 {e}"})
if not data:
return json.dumps({"code": 801, "data": "请求为空。"})
if "base64" not in data:
return json.dumps({"code": 802, "data": "请求中缺少 base64 字段。"})
if "options" not in data:
data["options"] = {}
elif not isinstance(data["options"], dict):
return json.dumps({"code": 803, "data": "请求中 options 字段必须为字典。"})
try:
# 补充缺失的默认参数
opt = data["options"]
default = get_ocr_options()
for key in default:
if key not in opt:
opt[key] = default[key]["default"]
# 检查OCR参数
check_ocr_options(opt)
except Exception as e:
return json.dumps({"code": 804, "data": f"options 解释失败。 {e}"})
# 同步执行
resList = MissionOCR.addMissionWait(opt, {"base64": data["base64"]})
res = resList[0]["result"]
if opt["data.format"] == "text": # 转纯文本
if res["code"] == 100:
res["data"] = getDataText(res["data"])
res = json.dumps(res)
return res
"""
const url = "http://127.0.0.1:1224/api/ocr";
const data = {
// 必填
"base64": "iVBORw0KGgoAAAANSUhEUgAAAC4AAAAXCAIAAAD7ruoFAAAACXBIWXMAABnWAAAZ1gEY0crtAAAAEXRFWHRTb2Z0d2FyZQBTbmlwYXN0ZV0Xzt0AAAHjSURBVEiJ7ZYrcsMwEEBXnR7FLuj0BPIJHJOi0DAZ2qSsMCxEgjYrDQqJdALrBJ2ASndRgeNI8ledutOCLrLl1e7T/mRkjIG/IXe/DWBldRTNEoQSpgNURe5puiiaJehrMuJSXSTgbaby0A1WzLrCCQCmyn0FwoN0V06QONWAt1nUxfnjHYA8p65GjhDKxcjedVH6JOejBPwYh21eE0Wzfe0tqIsEkGXcVcpoMH4CRZ+P0lsQp/pWJ4ripf1XFDFe8GHSHlYcSo9Es31t60RdFlN1RUmrma5oTzTVB8ZUaeeYEC9GmL6kNkDw9BANAQYo3xTNdqUkvHq+rYhDKW0Bj3RSEIpmyWyBaZaMTCrCK+tJ5Jsa07fs3E7esE66HzralRLgJKp0/BD6fJRSxvmDsb6joqkcFXGqMVVFFEHDL2gTxwCAaTabnkFUWhDCHTd9iYrGcAL1ZnqIp5Vpiqh7bCfua7FA4qN0INMcN1+cgCzj+UFxtbmvwdZvGIrI41JiqhZBWhhF8WxorkYPpQwJiWYJeA3rXE4hzcwJ+B96F9zCFHC0FcVegghvFul7oeEE8PvHeJqC0w0AUbbFIT8JnEwGbPKcS2OxU3HMTqD0r4wgEIuiKJ7i4MS16+og8/+bPZRPLa+6Ld2DSzcAAAAASUVORK5CYII=",
"options": {
"ocr.angle": false,
"ocr.language": "简体中文",
"ocr.maxSideLen": 1024,
"tbpu.parser": "multi_para",
"data.format": "text",
}
};
fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(data)
})
.then(response => response.json())
.then(data => {
console.log(data);
})
.catch(error => {
console.error(error);
});
const url = "http://127.0.0.1:1224/api/ocr/get_options";
fetch(url, {
method: "GET",
headers: {
"Content-Type": "application/json"
},
})
.then(response => response.json())
.then(data => {
console.log(data);
})
.catch(error => {
console.error(error);
});
"""

View File

@@ -0,0 +1,127 @@
import json
import base64
from io import BytesIO
from .bottle import request
from ..mission.mission_qrcode import MissionQRCode
# 从base64识别二维码图片。传入data指令字典 {"base64", "options"}
# 返回字典 {"code", "data"}
def base2text(data):
base64 = data["base64"]
opt = data.get("options", {})
res = MissionQRCode.addMissionWait(opt, [{"base64": base64}])
return res[0]["result"]
# 从文本生成base64。传入data指令字典 {"text", "xxx"}
# 返回 {"code", "data"}
def text2base(data):
text = data["text"]
opt = data.get("options", {})
format = opt.get("format", "QRCode")
w = opt.get("w", 0)
h = opt.get("h", 0)
quiet_zone = opt.get("quiet_zone", -1)
ec_level = opt.get("ec_level", -1)
try:
pil = MissionQRCode.createImage(text, format, w, h, quiet_zone, ec_level)
buffered = BytesIO()
pil.save(buffered, format="JPEG")
b64 = base64.b64encode(buffered.getvalue()).decode("ascii")
res = {"code": 100, "data": b64}
except Exception as e:
res = {"code": 200, "data": f"[Error] {str(e)}"}
return res
# 路由函数
def init(UmiWeb):
@UmiWeb.route("/api/qrcode", method="POST")
def _qrcode():
try:
data = request.json
except Exception as e:
return json.dumps({"code": 800, "data": f"请求无法解析为json。"})
if not data:
return json.dumps({"code": 801, "data": f"请求为空。"})
if "base64" in data:
return json.dumps(base2text(data))
elif "text" in data:
return json.dumps(text2base(data))
return json.dumps({"code": 802, "data": '指令中不存在 "base64""text"'})
"""
// 文本 → 二维码base64
const url = "http://127.0.0.1:1224/api/qrcode";
const text = "测试文本";
const data = {
// 必填
"text": text,
// 选填
"options": {
"format": "QRCode", // 码类型
"w": 0, // 图像宽度0为自动设为最小宽度
"h": 0, // 图像高度
"quiet_zone": -1, // 码四周的空白边缘宽度,-1为自动
"ec_level": -1, // 纠错等级,-1为自动
}
};
fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(data)
})
.then(response => response.json())
.then(data => {
if(data.code === 100) {
console.log("生成二维码base64", data.data);
}
else {
console.log("生成二维码base64失败错误码", data.code, " 错误信息:", data.data);
}
})
.catch(error => {
console.error(error);
});
// 二维码base64 → 文本
const url = "http://127.0.0.1:1224/api/qrcode";
const base64 = "/9j/4AAQSkZJRgABAQAAAQABAAD/2wBDAAgGBgcGBQgHBwcJCQgKDBQNDAsLDBkSEw8UHRofHh0aHBwgJC4nICIsIxwcKDcpLDAxNDQ0Hyc5PTgyPC4zNDL/wAALCAAdAB0BAREA/8QAHwAAAQUBAQEBAQEAAAAAAAAAAAECAwQFBgcICQoL/8QAtRAAAgEDAwIEAwUFBAQAAAF9AQIDAAQRBRIhMUEGE1FhByJxFDKBkaEII0KxwRVS0fAkM2JyggkKFhcYGRolJicoKSo0NTY3ODk6Q0RFRkdISUpTVFVWV1hZWmNkZWZnaGlqc3R1dnd4eXqDhIWGh4iJipKTlJWWl5iZmqKjpKWmp6ipqrKztLW2t7i5usLDxMXGx8jJytLT1NXW19jZ2uHi4+Tl5ufo6erx8vP09fb3+Pn6/9oACAEBAAA/APU/GfjM+EjAzW9o0DW8txNPdXEkSxKkkMYAEcUjMS069hjBrn3+K0yi3B0/RozO52y3OtG3gaPy7WRWV5IVJO27DFSoIEbYycCrF18Sb2z1a20u70rTbO8uLiKzigutRl3NcNDBIyAxW7rhTcIu4sAcE8Cu00LU/wC2/D2mat5Pk/brSK58rdu2b0Dbc4GcZxnAri/iSdPGs6AuqySW+nzpcW11dg27xwIzQspkimikDIZUiG/5QhK5PzCuPI1qz8ISalajUtNu1czLGsxnt7tHhhhiijNmkSF22W8aFeFWZ2RjIjeVXvrq0t/EWmaTpq3d9rTXFpCqpa2iRW92sCJOUP2WZYjEsNszrG7Bd/GNhr2zQtP/ALI8PaZpuMfY7SK3x5nmY2IF+9tXd067Vz6DpXH/ABK1LVrN7SLTIr6622k159isYYnknkjuLVUI8yGXGzzWfhc5UHPFeeSyav4dtI9R8O+Ho5dYS4WNrSK1EV2sb29ncFJY7aOPzIkkYhjhSGaME7WdHy72y8NWthbfDxrrfDDdpdXH2eVvtIu/IcStcOUaCGFMqGKNKUELZDEsU+g/DUcMXhXSI7cRrAllCsYjIKhQgxgh3BGP9t/95upk1PQtH1vyv7W0qxv/ACc+X9rt0l2ZxnG4HGcDp6Co7Xw1oNiipaaJptuiPvVYbVEAbcjZGB13RxnPqin+EYksdC0fTIo4rDSrG0jjlM6JBbpGFkKlC4AHDFSVz1wcdKuQQQ2tvFb28UcMESBI441CqigYAAHAAHGK/9k="
const data = { "base64": base64 };
fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(data)
})
.then(response => response.json())
.then(data => {
if(data.code === 100) {
console.log("识别二维码成功!图片中的二维码数量:", data.data.length);
console.log("所有码的内容:");
for (let d of data.data) {
console.log(" 文本:", d.text);
console.log(" 格式:", d.format);
console.log(" 方向:", d.orientation);
console.log(" ====");
}
}
else {
console.log("识别二维码失败!错误码:", data.code, " 错误信息:", data.data);
}
})
.catch(error => {
console.error(error);
});
"""

View File

@@ -0,0 +1,163 @@
# =========================================================
# ======= Web服务器 ========
# ======= http接口可复用于跨进程命令行、防止多开等方面 ========
# =========================================================
from PySide2.QtCore import QThreadPool, QRunnable
from wsgiref.simple_server import make_server, WSGIServer
from umi_log import logger
from ..utils import pre_configs
from ..utils.call_func import CallFunc
from .bottle import Bottle, ServerAdapter, request, HTTPResponse, response, BaseRequest
from .cmd_server import CmdServer
from . import ocr_server
from . import qrcode_server
from . import doc_server
BaseRequest.MEMFILE_MAX = 104857600 # 设置单次请求大小上限: 100 MB
UmiWeb = Bottle()
Host = "127.0.0.1" # 由qml设置
# 允许跨域
@UmiWeb.hook("before_request")
def _validate_before():
re_method = request.environ.get("REQUEST_METHOD")
hacrm = request.environ.get("HTTP_ACCESS_CONTROL_REQUEST_METHOD")
if re_method == "OPTIONS" and hacrm:
request.environ["REQUEST_METHOD"] = hacrm
@UmiWeb.hook("after_request")
def _validate_after():
response.headers["Access-Control-Allow-Origin"] = "*"
response.headers["Access-Control-Allow-Headers"] = "*"
response.headers["Access-Control-Allow-Methods"] = "GET, POST, PUT, OPTIONS"
# ============================== 基础路由 ==============================
@UmiWeb.route("/")
@UmiWeb.route("/umiocr")
def _umiocr():
from umi_about import UmiAbout # 项目信息
return UmiAbout["fullname"]
# 跨进程接收命令行参数
@UmiWeb.route("/argv", method="POST")
def _argv():
addr = request.environ.get("REMOTE_ADDR")
if addr == "127.0.0.1":
data = request.json
res = CmdServer.execute(data)
return res
else:
msg = "Unauthorized access. Only local requests are allowed.\n此接口只允许本机访问。"
return HTTPResponse(msg, status=401)
ocr_server.init(UmiWeb)
qrcode_server.init(UmiWeb)
doc_server.init(UmiWeb)
# =============== 自定义服务器适配器,方便控制服务终止 ==============================
QmlCallback = None # qml回调函数
class _WSGIRefServer(ServerAdapter):
# https://stackoverflow.com/questions/11282218/bottle-web-framework-how-to-stop
class CustomWSGIServer(WSGIServer): # 定制服务器
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.activeConnections = set() # 当前活跃连接
def process_request(self, request, client_address):
# 记录活跃的连接
self.activeConnections.add(request)
super().process_request(request, client_address)
def close_all_request(self): # 关闭所有活跃的连接
import socket
for request in self.activeConnections:
try:
request.shutdown(socket.SHUT_RDWR)
request.close()
logger.info(f"强制关闭连接。 request: {request}")
except OSError:
pass
except Exception:
logger.error(
f"强制连接异常。 request: {request}",
exc_info=True,
stack_info=True,
)
def run(self, handler):
import atexit # 退出处理
atexit.register(self.stop) # 注册程序终止时停止线程
self.port = pre_configs.getValue("server_port") # 提取记录的端口号
self.host = Host
# 找到一个可用的端口号
while True:
try:
self.server = make_server(
self.host,
self.port,
handler,
server_class=self.CustomWSGIServer,
**self.options,
)
break
except OSError: # 当前端口号已占用,测试下一位端口号
logger.warning(f"服务器端口号 {self.port} 已被占用")
self.port += 1
if self.port > 65535:
self.port = 1024
pre_configs.setValue("server_port", self.port) # 写入记录
logger.info(f"Listening on http://{self.host}:{self.port}")
print(f"Listening on http://{self.host}:{self.port}")
CallFunc.now(QmlCallback, self.port) # 在主线程中调用回调函数,告知实际端口号
self.server.serve_forever()
def stop(self): # 服务终止
# self.server.server_close() # 备选方案,但会导致 bad fd 异常
logger.debug("WEB服务器准备关闭")
self.server.close_all_request() # 强制关闭客户端连接
self.server.shutdown() # 关闭服务器
logger.info("WEB服务器已关闭")
# ============================== 线程类 ==============================
class _WorkerClass(QRunnable):
def run(self):
self._server = _WSGIRefServer()
UmiWeb.run(server=self._server)
_Worker = _WorkerClass()
# ============================== 控制接口 ==============================
# 启动web服务。传入qml对象回调函数名主机地址
def runUmiWeb(qmlObj, callback, host):
global QmlCallback, Host
Host = host
QmlCallback = getattr(qmlObj, callback, None) # 提取qml回调函数
threadPool = QThreadPool.globalInstance() # 获取全局线程池
threadPool.start(_Worker) # 启动服务器线程
# 切换端口号(下次启动生效)
def setPort(port):
pre_configs.setValue("server_port", port)