Files
work-secretfile-selfcheck/UmiOCR-data/py_src/server/cmd_server.py

525 lines
20 KiB
Python
Raw Normal View History

# ===============================================
# =============== 命令行-解析和执行 ===============
# ===============================================
import time
import json
import argparse
from threading import Condition
from ..utils.call_func import CallFunc
from ..utils.file_finder import findFiles
from ..event_bus.pubsub_service import PubSubService # 发布/订阅管理器
# 命令执行器
class _Actuator:
def __init__(self):
self.pyDict = {} # python模块字典
self.qmlDict = {} # qml模块字典
self.tagPageConn = None # 页面连接器的引用
# 初始化并收集信息。传入qml模块字典
def initCollect(self, qmlModuleDict):
qmlModuleDict = qmlModuleDict.toVariant()
self.qmlDict.update(qmlModuleDict)
# 获取页面连接器实例
from ..tag_pages.tag_pages_connector import TagPageConnObj
self.tagPageConn = TagPageConnObj
# ============================== 页面管理 ==============================
# 返回当前 [可创建的页面模板] 和 [已创建的页面] 的信息
def getAllPages(self):
TabViewManager = self.qmlDict["TabViewManager"]
pageList = TabViewManager.getPageList().toVariant()
infoStr = "All opened pages:\npage_index\tkey\ttitle\n"
for index, value in enumerate(pageList):
infoStr += f'{index}\t{value["ctrlKey"]}\t{value["info"]["title"]}\n'
infoList = TabViewManager.getInfoList().toVariant()
infoStr += (
"\nAll page templates that can be opened:\ntemplate_index\tkey\ttitle\n"
)
for index, value in enumerate(infoList):
infoStr += f'{index}\t{value["key"]}\t{value["title"]}\n'
infoStr += "\nUsage of create a page:\n"
infoStr += " Umi-OCR --add_page [template_index]\n"
infoStr += "Usage of delete a page:\n"
infoStr += " Umi-OCR --del_page [page_index]\n"
infoStr += "Usage of query the modules that can be called:\n"
infoStr += " Umi-OCR --all_modules\n"
return infoStr
# 创建页面
def addPage(self, index):
try:
index = int(index)
except ValueError:
return f"[Error] template_index must be integer, not {index}."
TabViewManager = self.qmlDict["TabViewManager"]
infoList = TabViewManager.getInfoList().toVariant()
l = len(infoList) - 1
if index < 0 or index > l:
return f"[Error] template_index {index} out of range (0~{l})."
return self.call("TabViewManager", "qml", "addTabPage", False, -1, index)
# 删除页面
def delPage(self, index):
try:
index = int(index)
except ValueError:
return f"[Error] page_index must be integer, not {index}."
TabViewManager = self.qmlDict["TabViewManager"]
pageList = TabViewManager.getPageList().toVariant()
l = len(pageList) - 1
if index < 0 or index > l:
return f"[Error] page_index {index} out of range (0~{l})."
return self.call("TabViewManager", "qml", "delTabPage", False, index)
# 通过key创建页面
def addPageByKey(self, key):
# 1. 检查截图标签页,如果未创建则创建
module, _ = self.getModuleFromName(key, "qml")
if module == None:
tvm = self.qmlDict["TabViewManager"]
infoList = tvm.getInfoList().toVariant()
f2 = False
for i, v in enumerate(infoList):
if v["key"] == key:
f2 = True
self.addPage(i)
break
if not f2:
return f"[Error] Template {key} not found."
for i in range(10):
time.sleep(0.5)
module, _ = self.getModuleFromName(key, "qml")
if module != None:
break
if module == None:
return f"[Error] Unable to create template {key}."
return "[Success]"
# ============================== 动态模块调用 ==============================
# 返回所有可调用模块
def getModules(self):
pyd, qmld = {}, {}
pages = self.tagPageConn.pages
for p in pages:
if pages[p]["qmlObj"]:
qmld[p] = pages[p]["qmlObj"]
if pages[p]["pyObj"]:
pyd[p] = pages[p]["pyObj"]
pyd.update(self.pyDict)
qmld.update(self.qmlDict)
return {"py": pyd, "qml": qmld}
# 传入(不完整的)模块名搜索并返回模块实例。type: py / qml
def getModuleFromName(self, moduleName, type_):
d = self.getModules()[type_]
module = None
if moduleName in d:
module = d[moduleName]
else:
for name in d.keys(): # 若输入模块名的前几个字母,也可以匹配
if name.startswith(moduleName):
moduleName = name
module = d[name]
break
return module, moduleName
# 返回所有可调用模块的帮助信息
def getModulesHelp(self):
modules = self.getModules()
help = "\nPython modules: (Usage: Umi-OCR --call_py [module name])\n"
for k in modules["py"].keys():
help += f" {k}\n"
help += "\nQml modules: (Usage: Umi-OCR --call_qml [module name])\n"
for k in modules["qml"].keys():
help += f" {k}\n"
help += f"\nTips: module name can only write the first letters, such as [ScreenshotOCR_1] → [Scr]"
return help
# 返回一个模块的所有函数的帮助信息
def getModuleFuncsHelp(self, moduleName, type_):
module, moduleName = self.getModuleFromName(moduleName, type_)
typeStr = "Python" if type_ == "py" else "qml"
if not module:
return f'[Error] {typeStr} module "{moduleName}" non-existent.'
funcs = [
func
for func in vars(type(module)).keys()
if callable(getattr(module, func))
]
help = f'All functions in {typeStr} module "{moduleName}":\n'
for f in funcs:
f = str(f)
if not f.startswith("_"):
help += f" {f}\n"
help += f"Usage: Umi-OCR --call_qml {moduleName} --func [function name]\n"
return help
# 调用一个模块函数。type: py / qml , thread: True 同步在子线程 / False 异步在主线程
def call(self, moduleName, type_, funcName, thread, *paras):
module, moduleName = self.getModuleFromName(moduleName, type_)
typeStr = "Python" if type_ == "py" else "qml"
if not module:
return f'[Error] {typeStr} module "{moduleName}" non-existent.'
func = getattr(module, funcName, None)
if not func:
return f'[Error] func "{funcName}" not exist in {typeStr} module "{moduleName}".'
try:
if thread: # 在子线程执行,返回结果
return func(*paras)
else: # 在主线程执行,返回标志文本
CallFunc.now(func, *paras) # 在主线程中调用回调函数
return f'Calling "{funcName}" in main thread.'
except Exception as e:
return f'[Error] calling {typeStr} module "{moduleName}" - "{funcName}" {paras}: {e}'
# ============================== 便捷指令 ==============================
# 控制主窗口
def ctrlWindow(self, show, hide, quit):
if show:
self.call("MainWindow", "qml", "setVisibility", False, True)
return "Umi-OCR show."
elif hide:
self.call("MainWindow", "qml", "setVisibility", False, False)
return "Umi-OCR hide."
elif quit:
self.call("MainWindow", "qml", "quit", False)
return "Umi-OCR quit."
# 快捷OCR截图/粘贴/路径,并获取返回结果
def quick_ocr(self, ss, clip, paras):
# 1. 检查截图标签页,如果未创建则创建
msg = self.addPageByKey("ScreenshotOCR")
if msg != "[Success]":
return msg
# 2. 订阅事件,监听 <<ScreenshotOcrEnd>>
isOcrEnd = False
resList = []
condition = Condition() # 线程同步器
def onOcrEnd(recentResult):
nonlocal isOcrEnd, resList
isOcrEnd = True
resList = recentResult
with condition: # 释放线程阻塞
condition.notify()
PubSubService.subscribe("<<ScreenshotOcrEnd>>", onOcrEnd)
# 3. 调用截图标签页的函数
if ss: # 截图
if not paras: # 无参数,手动截图
self.call("ScreenshotOCR", "qml", "screenshot", False)
else: # 有参数,自动截图 umi-ocr --screenshot screen=0 rect=0,100,500,200
rect = [0, 0, 0, 0] # 截图矩形框
screen = 0 # 显示器编号
para_args = []
try:
for para in paras: # 空格分隔
para_args.extend(para.split())
for part in para_args:
if part.startswith("rect="):
rect_values = part[len("rect=") :].split(",")
rect_values = [int(v) for v in rect_values]
rect[: len(rect_values)] = rect_values # 补齐rect的值
elif part.startswith("screen="):
screen = int(part[len("screen=") :])
self.call(
"ScreenshotOCR", "qml", "autoScreenshot", False, rect, screen
)
except Exception as e:
return f"[Error] {e}"
elif clip: # 粘贴
self.call("ScreenshotOCR", "qml", "paste", False)
else: # 路径
if not paras:
return "[Error] Paths is empty."
paths = findFiles(paras, "image", True) # 递归搜索
if not paths:
return "[Error] No valid path."
self.call("ScreenshotOCR", "qml", "ocrPaths", False, paths)
# 4. 堵塞等待任务完成,注销事件订阅
with condition:
while not isOcrEnd:
condition.wait()
PubSubService.unsubscribe("<<ScreenshotOcrEnd>>", onOcrEnd)
# 5. 处理结果列表,转文本
text = ""
for i, r in enumerate(resList): # 遍历图片
if text and not text.endswith("\n"): # 如果上次结果结尾没有换行,则补换行
text += "\n"
if r["code"] == 100:
for d in r["data"]: # 遍历文本块
text += d["text"] + d["end"]
elif r["code"] != 101 and isinstance(r["data"], str):
text += r["data"]
if not text:
text = "[Message] No text in OCR result."
return text
# 创建二维码
def qrcode_create(self, paras):
if len(paras) < 2:
return (
'[Error] Not enough arguments passed! Must pass "text" "save_image.jpg"'
)
text, path = paras[0], paras[1]
if len(paras) == 3:
w = h = int(paras[2])
elif len(paras) == 4:
w, h = int(paras[2]), int(paras[3])
else:
w = h = 0
try:
from ..mission.mission_qrcode import MissionQRCode
pil = MissionQRCode.createImage(
text,
format="QRCode", # 格式
w=w, # 宽高
h=h,
quiet_zone=-1, # 边缘宽度
ec_level=-1, # 纠错等级
)
if isinstance(pil, str):
return pil
pil.save(path)
return f"Successfully saved to {path}"
except Exception as e:
return f"[Error] {str(e)}"
# 识别二维码
def qrcode_read(self, paras):
if len(paras) < 1:
return '[Error] Not enough arguments passed! Must pass "image_to_recognize.jpg"'
try:
from ..mission.mission_qrcode import MissionQRCode
from PIL import Image
except Exception as e:
return f"[Error] {str(e)}"
resText = ""
paths = findFiles(paras, "image", True) # 递归搜索图片
for index, path in enumerate(paths):
if index != 0:
resText += "\n"
try:
pil = Image.open(path)
res = MissionQRCode.addMissionWait({}, [{"pil": pil}])
res = res[0]["result"]
if res["code"] == 100:
t = ""
for i, d in enumerate(res["data"]):
if i != 0:
t += "\n"
t += d["text"]
resText += t
elif res["code"] == 101:
resText += "No code in image."
else:
resText += f"[Error] Code: {res['code']}\nMessage: {res['data']}"
except Exception as e:
resText += f"[Error] {str(e)}"
return resText
CmdActuator = _Actuator()
# 命令解析器
class _Cmd:
def __init__(self):
self._parser = None
def init(self):
if self._parser:
return
self._parser = argparse.ArgumentParser(prog="Umi-OCR")
# 便捷指令
self._parser.add_argument(
"--show", action="store_true", help="Make the app appear in the foreground."
)
self._parser.add_argument(
"--hide", action="store_true", help="Hide app in the background."
)
self._parser.add_argument("--quit", action="store_true", help="Quit app.")
self._parser.add_argument(
"--screenshot",
action="store_true",
help="Screenshot OCR and output the result.",
)
self._parser.add_argument(
"--clipboard",
action="store_true",
help="Clipboard OCR and output the result.",
)
self._parser.add_argument(
"--path",
action="store_true",
help="OCR the image in path and output the result.",
)
self._parser.add_argument(
"--qrcode_create",
action="store_true",
help='Create a QR code from the text. Use --qrcode_create "text" "save_image.jpg"',
)
self._parser.add_argument(
"--qrcode_read",
action="store_true",
help='Read the QR code. Use --qrcode_read "image_to_recognize.jpg"',
)
self._parser.add_argument(
"--reload",
action="store_true",
help='Reload settings from the configuration file ".settings"',
)
# 页面管理
self._parser.add_argument(
"--all_pages",
action="store_true",
help="Output all template and page information.",
)
self._parser.add_argument(
"--add_page", type=int, help="usage: Umi-OCR --all_pages"
)
self._parser.add_argument(
"--del_page", type=int, help="usage: Umi-OCR --all_pages"
)
# 函数调用
self._parser.add_argument(
"--all_modules",
action="store_true",
help="Output all module names that can be called.",
)
self._parser.add_argument(
"--call_py", help="Calling a function on a Python module."
)
self._parser.add_argument(
"--call_qml", help="Calling a function on a Qml module."
)
self._parser.add_argument(
"--func", help="The name of the function to be called."
)
self._parser.add_argument(
"--thread",
action="store_true",
help="The function will be called on the child thread and return the result, but it may be unstable or cause QML to crash.",
)
# 输出
self._parser.add_argument(
"--clip",
action="store_true",
help="Copy the results to the clipboard.",
)
self._parser.add_argument(
"--output",
help="The path to the file where results will be saved. (overwrite)",
)
self._parser.add_argument(
"--output_append",
help="The path to the file where results will be saved. (append)",
)
self._parser.add_argument("-->", help='"-->" equivalent to "--output"')
self._parser.add_argument("-->>", help='"-->>" equivalent to "--output_append"')
self._parser.add_argument("paras", nargs="*", help="parameters of [--func].")
# 分析指令,返回指令对象或报错字符串
def parse(self, argv):
self.init()
# 特殊情况
if "-h" in argv or "--help" in argv: # 帮助
return self._parser.format_help()
if len(argv) == 0: # 空指令
CmdActuator.ctrlWindow(True, False, False) # 展示主窗
return self._parser.format_help() # 返回帮助
# 正常解析
try:
return self._parser.parse_args(argv)
except SystemExit as e:
return f"Your argv: {argv}\n[Error]: {e}\nusage: Umi-OCR --help"
except Exception as e:
return f"Your argv: {argv}\n[Error]: {e}\nusage: Umi-OCR --help"
# 执行指令,返回执行结果字符串
def execute(self, argv):
args = self.parse(argv)
if isinstance(args, str):
return args
if args.all_modules:
return CmdActuator.getModulesHelp()
# 便捷指令
if args.show or args.hide or args.quit: # 控制主窗
return CmdActuator.ctrlWindow(args.show, args.hide, args.quit)
if args.screenshot or args.clipboard or args.path: # 快捷识图
return CmdActuator.quick_ocr(args.screenshot, args.clipboard, args.paras)
if args.qrcode_create: # 写二维码
return CmdActuator.qrcode_create(args.paras)
if args.qrcode_read: # 读二维码
return CmdActuator.qrcode_read(args.paras)
if args.reload: # 重新加载配置
PubSubService.publish("<<settingsReload>>")
return "Settings reload."
# 页面管理
if args.all_pages:
return CmdActuator.getAllPages()
if not args.add_page is None:
return CmdActuator.addPage(args.add_page)
if not args.del_page is None:
return CmdActuator.delPage(args.del_page)
# 动态模块调用
if args.call_py:
if args.func:
return CmdActuator.call(
args.call_py,
"py",
args.func,
args.thread,
*self.format_paras(args.paras),
)
else:
return CmdActuator.getModuleFuncsHelp(args.call_py, "py")
if args.call_qml:
if args.func:
return CmdActuator.call(
args.call_qml,
"qml",
args.func,
args.thread,
*self.format_paras(args.paras),
)
else:
return CmdActuator.getModuleFuncsHelp(args.call_qml, "qml")
# paras 格式化
def format_paras(self, paras):
def convert_param(param):
try:
return int(param)
except ValueError:
pass
try:
return float(param)
except ValueError:
pass
try:
return json.loads(param)
except json.JSONDecodeError:
pass
return param
return [convert_param(p) for p in paras]
CmdServer = _Cmd()