#!/usr/bin/env python3 # @author yutent # @date 2023/08/08 14:07:26 import gi, os, json, shutil, hashlib, time, threading gi.require_version('Gtk', '3.0') gi.require_version("WebKit2", "4.1") from gi.repository import GObject, Gtk, Gdk, WebKit2, GLib, Gio, GdkPixbuf # 优先尝试使用指示器, 没有再使用 Gtk.StatusIcon try: # gir1.2-ayatanaappindicator3-0.1 gi.require_version('AyatanaAppIndicator3', '0.1') from gi.repository import AyatanaAppIndicator3 as AppIndicator3 except: AppIndicator3 = None try: # gir1.2-keybinder-3.0 gi.require_version("Keybinder", "3.0") from gi.repository import Keybinder # 初始化 Keybinder Keybinder.init() except: Keybinder = None from ._settings import create_setting from ._protocal import create_protocal from ._notify import create_notify from ._inject import Inject from ._utils import get_monitor_info, pixbuf_to_dict, dict_to_pixbuf env = { "HOME_DIR": os.getenv('HOME'), "CONFIG_DIR": os.path.join(os.getenv('HOME'), '.config'), "CACHE_DIR": os.path.join(os.getenv('HOME'), '.cache') } def noop(): pass # 类型js的settimeout的修饰器 def set_timeout(timeout = 0.5): def decorator(callback): def wrapper(*args): t = threading.Timer(timeout, callback, args=args) t.start() return t return wrapper return decorator class WebEngine(WebKit2.WebView): __gsignals__ = { 'quit': (GObject.SignalFlags.RUN_FIRST, None, ()) } root = None window = None custom_bridge = None def __init__(self, window): WebKit2.WebView.__init__(self) self.window = window self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD) self.display = Gdk.Display.get_default() self.use(create_notify()) setting = create_setting() self.use(setting) im = self.get_input_method_context() # 解决输入法候选框跟随问题 im.set_enable_preedit(False) Inject(self, env).connect(self.called_by_js) def set_root(self, root): self.root = root return self.use(create_protocal(root)) def use(self, middle_ware = noop, extra = None): middle_ware(self, extra) return self def load(self, url = '/index.html'): if self.root is None: raise EnvironmentError('web root dir not set!') else: self.load_uri(f"app://{url}") def call_js(self, method, data = None, err = None): if err is not None: err = str(err) scripts = 'native.$emit("' + method + '", ' + json.dumps(err) + ', ' + json.dumps(data) + ')' self.evaluate_javascript(scripts, -1) def called_by_js(self, webview, message): data = json.loads(message.get_js_value().to_json(0)) event = data.get('event') callback = data.get('callback') params = data.get('data') output = None _error = None match event: case 'init': output = env case 'fs': _error, output = self._fs_setting(params) case 'clipboard': _error, output = self._clipboard_setting(params) # 退出app case 'quit': self.emit('quit') # 读取图片, 返回图片像素数据 case 'image': filename = params['value'] pixbuf = GdkPixbuf.Pixbuf.new_from_file(filename) output = pixbuf_to_dict(pixbuf, filename) case 'monitor': if params['action'] == 'get-all': monitor_num = self.display.get_n_monitors() monitors = [self.display.get_monitor(i) for i in range(monitor_num)] output = [get_monitor_info(m) for m in monitors] elif params['action'] == 'get-primary': monitor = self.display.get_primary_monitor() output = get_monitor_info(monitor) case 'keybinder': _error, output = self._keybinder_setting(params) case 'tray': if params['action'] == 'create': pass elif params['action'] == 'remove': pass case 'window': _error, output = self._window_setting(params) case 'notify': if self.notify is None: _error = ImportError('Notify module not found. Need to install gir1.2-notify-0.7 if you use debian.') else: title = params.get('title') summary = params.get('summary') icon = params.get('icon') progress = params.get('progress') urgency = params.get('urgency') self.notify.create(title, summary, icon, progress, urgency, params.get('callback')) case 'proxy': _error, output = self._proxy_setting(params) case 'md5': output = hashlib.md5(str(params.get('value'))).hexdigest() case _: if self.custom_bridge is None: pass else: _error, output = self.custom_bridge(event, params) # 有回调则返回结果 if callback: self.call_js(callback, output, _error) def _fs_setting(self, params = {}): _error = None output = None filepath = params.get('filepath') match(params.get('action')): case 'access': try: with open(filepath, params.get('mode')) as file: output = True except Exception as err: output = False case 'read': try: with open(filepath, params.get('mode')) as file: output = file.read() if params.get('mode').find('b') > -1: output = list(output) except Exception as err: _error = err case 'write': # 调整以支持二进制数据写入 try: with open(filepath, params.get('mode')) as file: buff = params['content'] if params.get('mode').find('b') > -1: buff = bytes(buff) output = file.write(buff) except Exception as err: _error = err case 'exists': output = os.path.exists(filepath) case 'list': with os.scandir(filepath) as entries: output = [{ "name": it.name, "path": os.path.join(filepath, it.name), "is_dir": it.is_dir(), "size": it.stat().st_size, "atime": int(it.stat().st_atime), "mtime": int(it.stat().st_mtime), } for it in entries] case 'remove': if os.path.isfile(filepath): output = os.remove(filepath) elif os.path.isdir(filepath): output = os.removedirs(filepath) case 'rename': if os.path.exists(filepath): output = shutil.move(filepath, params['target']) case 'copy': if os.path.exists(filepath): output = shutil.copy2(filepath, params['target']) case 'isfile': output = os.path.isfile(filepath) case 'isdir': output = os.path.isdir(filepath) return (_error, output) def _clipboard_setting(self, params = {}): _error = None output = None match(params.get('action')): # 读文本 case 'wait_for_text': output = self.clipboard.wait_for_text() # 写文本 case 'set_text': self.clipboard.set_text(params['value'], -1) # 写图片 case 'set_image': image = params['value'] # 前端传进来的值, 如果是路径的话, 直接读取 if type(image) == str: image = GdkPixbuf.Pixbuf.new_from_file(image) else: image = dict_to_pixbuf(image) self.clipboard.set_image(image) self.clipboard.store() # 读图片 case 'wait_for_image': output = self.clipboard.wait_for_image() output = pixbuf_to_dict(output, 'noname.png') # 清除剪切板 case 'clear': self.clipboard.clear() return (_error, output) def _keybinder_setting(self, params = {}): _error = None output = None keymap = params.get('value') shortcut_callback = params.get('shortcut_callback') or '' if params['action'] == 'register': # 绑定之前, 先解绑, 避免被重复绑定 if Keybinder: Keybinder.unbind(keymap) output = Keybinder.bind( keymap, lambda km : self.call_js(shortcut_callback) ) elif params['action'] == 'unregister': if Keybinder: Keybinder.unbind(keymap) output = True elif params['action'] == 'supported': if Keybinder: output = Keybinder.supported() else: output = False return (_error, output) def _window_setting(self, params = {}): _error = None output = None match(params.get('action')): case 'fullscreen': self.window.fullscreen() case 'unfullscreen': self.window.unfullscreen() case 'maximize': self.window.maximize() case 'unmaximize': self.window.unmaximize() case 'set_title': self.window.set_title(params['value'] or '') case 'resize': self.window.resize(params['value'].get('width'), params['value'].get('height')) case 'set_opacity': self.window.set_opacity(params['value']) case 'set_keep_above': self.window.set_keep_above(params['value']) case 'set_keep_below': self.window.set_keep_below(params['value']) case 'move': self.window.move(params['value'].get('x'), params['value'].get('y')) case 'toggle_visible': if self.window.is_visible(): self.window.hide() else: self.window.present() case 'hide': self.window.hide() case 'show': self.window.present() case 'is_visible': output = self.window.is_visible() return (_error, output) def _proxy_setting(self, params = {}): dm = self.get_website_data_manager() output = True _error = None if params['action'] == 'disable': dm.set_network_proxy_settings(WebKit2.NetworkProxyMode.NO_PROXY, None) elif params['action'] == 'system': dm.set_network_proxy_settings(WebKit2.NetworkProxyMode.DEFAULT, None) else: try: proxy = WebKit2.NetworkProxySettings(params['url'], params['ignore']) dm.set_network_proxy_settings(WebKit2.NetworkProxyMode.CUSTOM, proxy) except Exception as err: _error = err output = False return (_error, output)