Files

525 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ===============================================
# =============== 命令行-解析和执行 ===============
# ===============================================
import time
import json
import argparse
from threading import Condition
from ..utils.call_func import CallFunc
from ..utils.file_finder import findFiles
from ..event_bus.pubsub_service import PubSubService # 发布/订阅管理器
# 命令执行器
class _Actuator:
def __init__(self):
self.pyDict = {} # python模块字典
self.qmlDict = {} # qml模块字典
self.tagPageConn = None # 页面连接器的引用
# 初始化并收集信息。传入qml模块字典
def initCollect(self, qmlModuleDict):
qmlModuleDict = qmlModuleDict.toVariant()
self.qmlDict.update(qmlModuleDict)
# 获取页面连接器实例
from ..tag_pages.tag_pages_connector import TagPageConnObj
self.tagPageConn = TagPageConnObj
# ============================== 页面管理 ==============================
# 返回当前 [可创建的页面模板] 和 [已创建的页面] 的信息
def getAllPages(self):
TabViewManager = self.qmlDict["TabViewManager"]
pageList = TabViewManager.getPageList().toVariant()
infoStr = "All opened pages:\npage_index\tkey\ttitle\n"
for index, value in enumerate(pageList):
infoStr += f'{index}\t{value["ctrlKey"]}\t{value["info"]["title"]}\n'
infoList = TabViewManager.getInfoList().toVariant()
infoStr += (
"\nAll page templates that can be opened:\ntemplate_index\tkey\ttitle\n"
)
for index, value in enumerate(infoList):
infoStr += f'{index}\t{value["key"]}\t{value["title"]}\n'
infoStr += "\nUsage of create a page:\n"
infoStr += " Umi-OCR --add_page [template_index]\n"
infoStr += "Usage of delete a page:\n"
infoStr += " Umi-OCR --del_page [page_index]\n"
infoStr += "Usage of query the modules that can be called:\n"
infoStr += " Umi-OCR --all_modules\n"
return infoStr
# 创建页面
def addPage(self, index):
try:
index = int(index)
except ValueError:
return f"[Error] template_index must be integer, not {index}."
TabViewManager = self.qmlDict["TabViewManager"]
infoList = TabViewManager.getInfoList().toVariant()
l = len(infoList) - 1
if index < 0 or index > l:
return f"[Error] template_index {index} out of range (0~{l})."
return self.call("TabViewManager", "qml", "addTabPage", False, -1, index)
# 删除页面
def delPage(self, index):
try:
index = int(index)
except ValueError:
return f"[Error] page_index must be integer, not {index}."
TabViewManager = self.qmlDict["TabViewManager"]
pageList = TabViewManager.getPageList().toVariant()
l = len(pageList) - 1
if index < 0 or index > l:
return f"[Error] page_index {index} out of range (0~{l})."
return self.call("TabViewManager", "qml", "delTabPage", False, index)
# 通过key创建页面
def addPageByKey(self, key):
# 1. 检查截图标签页,如果未创建则创建
module, _ = self.getModuleFromName(key, "qml")
if module == None:
tvm = self.qmlDict["TabViewManager"]
infoList = tvm.getInfoList().toVariant()
f2 = False
for i, v in enumerate(infoList):
if v["key"] == key:
f2 = True
self.addPage(i)
break
if not f2:
return f"[Error] Template {key} not found."
for i in range(10):
time.sleep(0.5)
module, _ = self.getModuleFromName(key, "qml")
if module != None:
break
if module == None:
return f"[Error] Unable to create template {key}."
return "[Success]"
# ============================== 动态模块调用 ==============================
# 返回所有可调用模块
def getModules(self):
pyd, qmld = {}, {}
pages = self.tagPageConn.pages
for p in pages:
if pages[p]["qmlObj"]:
qmld[p] = pages[p]["qmlObj"]
if pages[p]["pyObj"]:
pyd[p] = pages[p]["pyObj"]
pyd.update(self.pyDict)
qmld.update(self.qmlDict)
return {"py": pyd, "qml": qmld}
# 传入(不完整的)模块名搜索并返回模块实例。type: py / qml
def getModuleFromName(self, moduleName, type_):
d = self.getModules()[type_]
module = None
if moduleName in d:
module = d[moduleName]
else:
for name in d.keys(): # 若输入模块名的前几个字母,也可以匹配
if name.startswith(moduleName):
moduleName = name
module = d[name]
break
return module, moduleName
# 返回所有可调用模块的帮助信息
def getModulesHelp(self):
modules = self.getModules()
help = "\nPython modules: (Usage: Umi-OCR --call_py [module name])\n"
for k in modules["py"].keys():
help += f" {k}\n"
help += "\nQml modules: (Usage: Umi-OCR --call_qml [module name])\n"
for k in modules["qml"].keys():
help += f" {k}\n"
help += f"\nTips: module name can only write the first letters, such as [ScreenshotOCR_1] → [Scr]"
return help
# 返回一个模块的所有函数的帮助信息
def getModuleFuncsHelp(self, moduleName, type_):
module, moduleName = self.getModuleFromName(moduleName, type_)
typeStr = "Python" if type_ == "py" else "qml"
if not module:
return f'[Error] {typeStr} module "{moduleName}" non-existent.'
funcs = [
func
for func in vars(type(module)).keys()
if callable(getattr(module, func))
]
help = f'All functions in {typeStr} module "{moduleName}":\n'
for f in funcs:
f = str(f)
if not f.startswith("_"):
help += f" {f}\n"
help += f"Usage: Umi-OCR --call_qml {moduleName} --func [function name]\n"
return help
# 调用一个模块函数。type: py / qml , thread: True 同步在子线程 / False 异步在主线程
def call(self, moduleName, type_, funcName, thread, *paras):
module, moduleName = self.getModuleFromName(moduleName, type_)
typeStr = "Python" if type_ == "py" else "qml"
if not module:
return f'[Error] {typeStr} module "{moduleName}" non-existent.'
func = getattr(module, funcName, None)
if not func:
return f'[Error] func "{funcName}" not exist in {typeStr} module "{moduleName}".'
try:
if thread: # 在子线程执行,返回结果
return func(*paras)
else: # 在主线程执行,返回标志文本
CallFunc.now(func, *paras) # 在主线程中调用回调函数
return f'Calling "{funcName}" in main thread.'
except Exception as e:
return f'[Error] calling {typeStr} module "{moduleName}" - "{funcName}" {paras}: {e}'
# ============================== 便捷指令 ==============================
# 控制主窗口
def ctrlWindow(self, show, hide, quit):
if show:
self.call("MainWindow", "qml", "setVisibility", False, True)
return "Umi-OCR show."
elif hide:
self.call("MainWindow", "qml", "setVisibility", False, False)
return "Umi-OCR hide."
elif quit:
self.call("MainWindow", "qml", "quit", False)
return "Umi-OCR quit."
# 快捷OCR截图/粘贴/路径,并获取返回结果
def quick_ocr(self, ss, clip, paras):
# 1. 检查截图标签页,如果未创建则创建
msg = self.addPageByKey("ScreenshotOCR")
if msg != "[Success]":
return msg
# 2. 订阅事件,监听 <<ScreenshotOcrEnd>>
isOcrEnd = False
resList = []
condition = Condition() # 线程同步器
def onOcrEnd(recentResult):
nonlocal isOcrEnd, resList
isOcrEnd = True
resList = recentResult
with condition: # 释放线程阻塞
condition.notify()
PubSubService.subscribe("<<ScreenshotOcrEnd>>", onOcrEnd)
# 3. 调用截图标签页的函数
if ss: # 截图
if not paras: # 无参数,手动截图
self.call("ScreenshotOCR", "qml", "screenshot", False)
else: # 有参数,自动截图 umi-ocr --screenshot screen=0 rect=0,100,500,200
rect = [0, 0, 0, 0] # 截图矩形框
screen = 0 # 显示器编号
para_args = []
try:
for para in paras: # 空格分隔
para_args.extend(para.split())
for part in para_args:
if part.startswith("rect="):
rect_values = part[len("rect=") :].split(",")
rect_values = [int(v) for v in rect_values]
rect[: len(rect_values)] = rect_values # 补齐rect的值
elif part.startswith("screen="):
screen = int(part[len("screen=") :])
self.call(
"ScreenshotOCR", "qml", "autoScreenshot", False, rect, screen
)
except Exception as e:
return f"[Error] {e}"
elif clip: # 粘贴
self.call("ScreenshotOCR", "qml", "paste", False)
else: # 路径
if not paras:
return "[Error] Paths is empty."
paths = findFiles(paras, "image", True) # 递归搜索
if not paths:
return "[Error] No valid path."
self.call("ScreenshotOCR", "qml", "ocrPaths", False, paths)
# 4. 堵塞等待任务完成,注销事件订阅
with condition:
while not isOcrEnd:
condition.wait()
PubSubService.unsubscribe("<<ScreenshotOcrEnd>>", onOcrEnd)
# 5. 处理结果列表,转文本
text = ""
for i, r in enumerate(resList): # 遍历图片
if text and not text.endswith("\n"): # 如果上次结果结尾没有换行,则补换行
text += "\n"
if r["code"] == 100:
for d in r["data"]: # 遍历文本块
text += d["text"] + d["end"]
elif r["code"] != 101 and isinstance(r["data"], str):
text += r["data"]
if not text:
text = "[Message] No text in OCR result."
return text
# 创建二维码
def qrcode_create(self, paras):
if len(paras) < 2:
return (
'[Error] Not enough arguments passed! Must pass "text" "save_image.jpg"'
)
text, path = paras[0], paras[1]
if len(paras) == 3:
w = h = int(paras[2])
elif len(paras) == 4:
w, h = int(paras[2]), int(paras[3])
else:
w = h = 0
try:
from ..mission.mission_qrcode import MissionQRCode
pil = MissionQRCode.createImage(
text,
format="QRCode", # 格式
w=w, # 宽高
h=h,
quiet_zone=-1, # 边缘宽度
ec_level=-1, # 纠错等级
)
if isinstance(pil, str):
return pil
pil.save(path)
return f"Successfully saved to {path}"
except Exception as e:
return f"[Error] {str(e)}"
# 识别二维码
def qrcode_read(self, paras):
if len(paras) < 1:
return '[Error] Not enough arguments passed! Must pass "image_to_recognize.jpg"'
try:
from ..mission.mission_qrcode import MissionQRCode
from PIL import Image
except Exception as e:
return f"[Error] {str(e)}"
resText = ""
paths = findFiles(paras, "image", True) # 递归搜索图片
for index, path in enumerate(paths):
if index != 0:
resText += "\n"
try:
pil = Image.open(path)
res = MissionQRCode.addMissionWait({}, [{"pil": pil}])
res = res[0]["result"]
if res["code"] == 100:
t = ""
for i, d in enumerate(res["data"]):
if i != 0:
t += "\n"
t += d["text"]
resText += t
elif res["code"] == 101:
resText += "No code in image."
else:
resText += f"[Error] Code: {res['code']}\nMessage: {res['data']}"
except Exception as e:
resText += f"[Error] {str(e)}"
return resText
CmdActuator = _Actuator()
# 命令解析器
class _Cmd:
def __init__(self):
self._parser = None
def init(self):
if self._parser:
return
self._parser = argparse.ArgumentParser(prog="Umi-OCR")
# 便捷指令
self._parser.add_argument(
"--show", action="store_true", help="Make the app appear in the foreground."
)
self._parser.add_argument(
"--hide", action="store_true", help="Hide app in the background."
)
self._parser.add_argument("--quit", action="store_true", help="Quit app.")
self._parser.add_argument(
"--screenshot",
action="store_true",
help="Screenshot OCR and output the result.",
)
self._parser.add_argument(
"--clipboard",
action="store_true",
help="Clipboard OCR and output the result.",
)
self._parser.add_argument(
"--path",
action="store_true",
help="OCR the image in path and output the result.",
)
self._parser.add_argument(
"--qrcode_create",
action="store_true",
help='Create a QR code from the text. Use --qrcode_create "text" "save_image.jpg"',
)
self._parser.add_argument(
"--qrcode_read",
action="store_true",
help='Read the QR code. Use --qrcode_read "image_to_recognize.jpg"',
)
self._parser.add_argument(
"--reload",
action="store_true",
help='Reload settings from the configuration file ".settings"',
)
# 页面管理
self._parser.add_argument(
"--all_pages",
action="store_true",
help="Output all template and page information.",
)
self._parser.add_argument(
"--add_page", type=int, help="usage: Umi-OCR --all_pages"
)
self._parser.add_argument(
"--del_page", type=int, help="usage: Umi-OCR --all_pages"
)
# 函数调用
self._parser.add_argument(
"--all_modules",
action="store_true",
help="Output all module names that can be called.",
)
self._parser.add_argument(
"--call_py", help="Calling a function on a Python module."
)
self._parser.add_argument(
"--call_qml", help="Calling a function on a Qml module."
)
self._parser.add_argument(
"--func", help="The name of the function to be called."
)
self._parser.add_argument(
"--thread",
action="store_true",
help="The function will be called on the child thread and return the result, but it may be unstable or cause QML to crash.",
)
# 输出
self._parser.add_argument(
"--clip",
action="store_true",
help="Copy the results to the clipboard.",
)
self._parser.add_argument(
"--output",
help="The path to the file where results will be saved. (overwrite)",
)
self._parser.add_argument(
"--output_append",
help="The path to the file where results will be saved. (append)",
)
self._parser.add_argument("-->", help='"-->" equivalent to "--output"')
self._parser.add_argument("-->>", help='"-->>" equivalent to "--output_append"')
self._parser.add_argument("paras", nargs="*", help="parameters of [--func].")
# 分析指令,返回指令对象或报错字符串
def parse(self, argv):
self.init()
# 特殊情况
if "-h" in argv or "--help" in argv: # 帮助
return self._parser.format_help()
if len(argv) == 0: # 空指令
CmdActuator.ctrlWindow(True, False, False) # 展示主窗
return self._parser.format_help() # 返回帮助
# 正常解析
try:
return self._parser.parse_args(argv)
except SystemExit as e:
return f"Your argv: {argv}\n[Error]: {e}\nusage: Umi-OCR --help"
except Exception as e:
return f"Your argv: {argv}\n[Error]: {e}\nusage: Umi-OCR --help"
# 执行指令,返回执行结果字符串
def execute(self, argv):
args = self.parse(argv)
if isinstance(args, str):
return args
if args.all_modules:
return CmdActuator.getModulesHelp()
# 便捷指令
if args.show or args.hide or args.quit: # 控制主窗
return CmdActuator.ctrlWindow(args.show, args.hide, args.quit)
if args.screenshot or args.clipboard or args.path: # 快捷识图
return CmdActuator.quick_ocr(args.screenshot, args.clipboard, args.paras)
if args.qrcode_create: # 写二维码
return CmdActuator.qrcode_create(args.paras)
if args.qrcode_read: # 读二维码
return CmdActuator.qrcode_read(args.paras)
if args.reload: # 重新加载配置
PubSubService.publish("<<settingsReload>>")
return "Settings reload."
# 页面管理
if args.all_pages:
return CmdActuator.getAllPages()
if not args.add_page is None:
return CmdActuator.addPage(args.add_page)
if not args.del_page is None:
return CmdActuator.delPage(args.del_page)
# 动态模块调用
if args.call_py:
if args.func:
return CmdActuator.call(
args.call_py,
"py",
args.func,
args.thread,
*self.format_paras(args.paras),
)
else:
return CmdActuator.getModuleFuncsHelp(args.call_py, "py")
if args.call_qml:
if args.func:
return CmdActuator.call(
args.call_qml,
"qml",
args.func,
args.thread,
*self.format_paras(args.paras),
)
else:
return CmdActuator.getModuleFuncsHelp(args.call_qml, "qml")
# paras 格式化
def format_paras(self, paras):
def convert_param(param):
try:
return int(param)
except ValueError:
pass
try:
return float(param)
except ValueError:
pass
try:
return json.loads(param)
except json.JSONDecodeError:
pass
return param
return [convert_param(p) for p in paras]
CmdServer = _Cmd()