python3-webengine-gtk3/usr/lib/python3/dist-packages/webengine/gtk3/_webengine.py

543 lines
14 KiB
Python

#!/usr/bin/env python3
# @author yutent<yutent.io@gmail.com>
# @date 2023/08/08 14:07:26
import gi, os, sys, json
import webbrowser, shutil, hashlib, random
gi.require_version('Gtk', '3.0')
gi.require_version("WebKit2", "4.1")
from gi.repository import GObject, Gtk, Gdk, WebKit2, GLib, Gio, GdkPixbuf
from ._custom_window import create_same_window, create_custom_window
# 优先尝试使用指示器, 没有再使用 Gtk.StatusIcon
try:
# gir1.2-ayatanaappindicator3-0.1
gi.require_version('AyatanaAppIndicator3', '0.1')
from gi.repository import AyatanaAppIndicator3 as AppIndicator3
except:
AppIndicator3 = None
try:
# gir1.2-keybinder-3.0
gi.require_version("Keybinder", "3.0")
from gi.repository import Keybinder
# 初始化 Keybinder
Keybinder.init()
except:
Keybinder = None
from ._env import env
from ._version import version
from ._settings import create_setting
from ._protocal import create_protocal
from ._notify import create_notify
from ._inject import Inject
from ._utils import noop, get_monitor_info, pixbuf_to_dict, dict_to_pixbuf, run_async, idle
class WebEngine(WebKit2.WebView):
__gsignals__ = {
'quit': (GObject.SignalFlags.RUN_FIRST, None, ())
}
app_name = 'WebEngine'
app_version = version
uuid = None
root = None
window = None
opener = None
children = set()
custom_bridge = None
def __init__(self, win, opener = None, uuid = None):
WebKit2.WebView.__init__(self)
if uuid is None:
self.uuid = random.randbytes(8).hex()
else:
self.uuid = uuid
self.opener = opener
self.window = win
if opener is not None:
opener.children.add(self)
if win.get_title() is None:
win.set_title(self.app_name)
if win.get_icon() is None and win.get_icon_name() is None:
win.set_icon_name('web-browser')
self.clipboard = Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD)
self.display = Gdk.Display.get_default()
self.use(create_notify())
setting = create_setting()
self.use(setting)
im = self.get_input_method_context()
# 解决输入法候选框跟随问题
im.set_enable_preedit(False)
Inject(self, env).connect(self.called_by_js)
self.connect('create', self.create_new_window)
# 允许前端 widnow.close() 关闭窗口
self.connect('close', self.close_window)
# 通过外部关闭窗口时从父级中移除
win.connect("destroy", self.remove_from_opener)
win.connect('hide', lambda w: self.call_js('hide'))
win.connect('show', lambda w: self.call_js('show'))
def remove_from_opener(self, win = None):
if self.opener is not None:
try:
self.opener.children.remove(self)
except:
pass
def close_window(self, wv = None):
self.remove_from_opener()
self.window.close()
# 响应原生js的 window.open
def create_new_window(self, webview, nav):
req = nav.get_request()
return create_same_window(self, req)
# 手动创建窗口
def new_window_by_custom(self, options = {}):
return create_custom_window(self, options)
def set_root(self, root, has_protocol = False):
self.root = root
if has_protocol:
return self
return self.use(create_protocal(root))
def use(self, middle_ware = noop, extra = None):
middle_ware(self, extra)
return self
def load(self, url = '/index.html'):
if url.startswith('http://') or url.startswith('https://'):
self.load_uri(url)
else:
if self.root is None:
raise EnvironmentError('web root dir not set!')
else:
if url.startswith('/'):
self.load_uri(f"app://{url}")
elif url.startswith('app://'):
self.load_uri(url)
else:
raise ValueError('url must starts with "/" or "app://"')
@idle
def call_js(self, method, data = None, err = None):
if err is not None:
err = str(err)
scripts = 'native.$emit("' + method + '", ' + json.dumps(err) + ', ' + json.dumps(data) + ')'
self.evaluate_javascript(scripts, -1)
@run_async
def called_by_js(self, webview, message):
data = json.loads(message.get_js_value().to_json(0))
event = data.get('event')
callback = data.get('callback')
params = data.get('data')
output = None
_error = None
match event:
case 'app':
_error, output = self._app_handler(params)
case 'fs':
_error, output = self._fs_handler(params)
case 'clipboard':
_error, output = self._clipboard_handler(params)
# 读取图片, 返回图片像素数据
case 'image':
filename = params['value']
pixbuf = GdkPixbuf.Pixbuf.new_from_file(filename)
output = pixbuf_to_dict(pixbuf, filename)
case 'monitor':
if params['action'] == 'get-all':
monitor_num = self.display.get_n_monitors()
monitors = [self.display.get_monitor(i) for i in range(monitor_num)]
output = [get_monitor_info(m) for m in monitors]
elif params['action'] == 'get-primary':
monitor = self.display.get_primary_monitor()
output = get_monitor_info(monitor)
case 'keybinder':
_error, output = self._keybinder_handler(params)
case 'tray':
if params['action'] == 'create':
pass
elif params['action'] == 'remove':
pass
case 'opener':
callback = 'opener_message'
output = params.get('data')
uuid = json.dumps(self.uuid if self.opener else None)
scripts = f"native.$emit('opener_message', {json.dumps(output)}, {uuid})"
if self.opener is None:
self.evaluate_javascript(scripts, -1)
else:
self.opener.evaluate_javascript(scripts, -1)
return
case 'children':
callback = 'opener_message'
output = params.get('data')
uuid = params.get('uuid')
scripts = f"native.$emit('opener_message', {json.dumps(output)})"
if len(self.children) == 0:
self.evaluate_javascript(scripts, -1)
else:
if uuid is None:
for child in self.children:
child.evaluate_javascript(scripts, -1)
else:
for child in self.children:
if child.uuid == uuid:
child.evaluate_javascript(scripts, -1)
break
return
case 'window':
_error, output = self._window_handler(params)
case 'notify':
if self.notify is None:
_error = ImportError('Notify module not found. Need to install gir1.2-notify-0.7 if you use debian.')
else:
title = params.get('title')
summary = params.get('summary')
icon = params.get('icon')
progress = params.get('progress')
urgency = params.get('urgency')
self.notify.create(title, summary, icon, progress, urgency, params.get('callback'))
case 'proxy':
_error, output = self._proxy_handler(params)
case 'md5':
output = hashlib.md5(str(params.get('value'))).hexdigest()
case _:
if self.custom_bridge is not None:
try:
_error, output = self.custom_bridge(event, params) or (None, None)
except Exception as err:
print(err)
_error = err
# 有回调则返回结果
if callback:
self.call_js(callback, output, _error)
def _app_handler(self, params = {}):
_error = None
output = None
match(params.get('action')):
# 退出app
case 'quit':
self.close_window()
self.emit('quit')
case 'relaunch':
py = sys.executable
os.execl(py, py, *sys.argv)
return (_error, output)
def _shell_handler(self, params = {}):
_error = None
output = None
path = params.get('path')
match(params.get('action')):
case 'openExternal':
webbrowser.open(params.get('url'))
case 'showItemInFolder':
os.system(f"xdg-open '{path}'")
case 'openPath':
os.system(f"xdg-open '{path}'")
case 'trashItem':
pass
def _fs_handler(self, params = {}):
_error = None
output = None
filepath = params.get('filepath')
match(params.get('action')):
case 'access':
try:
file = open(filepath, params.get('mode'))
file.close()
output = True
except Exception as err:
output = False
case 'read':
try:
with open(filepath, params.get('mode')) as file:
output = file.read()
if params.get('mode').find('b') > -1:
output = list(output)
except Exception as err:
_error = err
case 'write':
# 调整以支持二进制数据写入
try:
with open(filepath, params.get('mode')) as file:
buff = params['content']
if params.get('mode').find('b') > -1:
buff = bytes(buff)
output = file.write(buff)
except Exception as err:
_error = err
case 'exists':
output = os.path.exists(filepath)
case 'list':
with os.scandir(filepath) as entries:
output = [{
"name": it.name,
"path": os.path.join(filepath, it.name),
"is_dir": it.is_dir(),
"size": it.stat().st_size,
"atime": int(it.stat().st_atime),
"mtime": int(it.stat().st_mtime),
} for it in entries]
case 'remove':
if os.path.isfile(filepath):
output = os.remove(filepath)
elif os.path.isdir(filepath):
output = os.removedirs(filepath)
case 'rename':
if os.path.exists(filepath):
output = shutil.move(filepath, params['target'])
case 'copy':
if os.path.exists(filepath):
output = shutil.copy2(filepath, params['target'])
case 'isfile':
output = os.path.isfile(filepath)
case 'isdir':
output = os.path.isdir(filepath)
case 'mkdir':
output = os.makedirs(filepath)
return (_error, output)
def _clipboard_handler(self, params = {}):
_error = None
output = None
match(params.get('action')):
# 读文本
case 'wait_for_text':
output = self.clipboard.wait_for_text()
# 写文本
case 'set_text':
self.clipboard.set_text(params['value'], -1)
# 写图片
case 'set_image':
image = params['value']
# 前端传进来的值, 如果是路径的话, 直接读取
if type(image) == str:
image = GdkPixbuf.Pixbuf.new_from_file(image)
else:
image = dict_to_pixbuf(image)
self.clipboard.set_image(image)
self.clipboard.store()
# 读图片
case 'wait_for_image':
output = self.clipboard.wait_for_image()
output = pixbuf_to_dict(output, 'noname.png')
# 清除剪切板
case 'clear':
self.clipboard.clear()
return (_error, output)
def _keybinder_handler(self, params = {}):
_error = None
output = None
keymap = params.get('value')
shortcut_callback = params.get('shortcut_callback') or ''
if params['action'] == 'register':
# 绑定之前, 先解绑, 避免被重复绑定
if Keybinder:
Keybinder.unbind(keymap)
output = Keybinder.bind(
keymap,
lambda km : self.call_js(shortcut_callback)
)
elif params['action'] == 'unregister':
if Keybinder:
Keybinder.unbind(keymap)
output = True
elif params['action'] == 'supported':
if Keybinder:
output = Keybinder.supported()
else:
output = False
return (_error, output)
def _window_handler(self, params = {}):
_error = None
output = None
match(params.get('action')):
case 'create':
self.new_window_by_custom(params.get('options'))
case 'close':
self.close_window()
case 'fullscreen':
self.window.fullscreen()
case 'unfullscreen':
self.window.unfullscreen()
case 'maximize':
self.window.maximize()
case 'unmaximize':
self.window.unmaximize()
case 'set_title':
self.window.set_title(params['value'] or '')
case 'resize':
self.window.resize(params['value'].get('width'), params['value'].get('height'))
case 'set_opacity':
self.window.set_opacity(params['value'])
case 'set_keep_above':
self.window.set_keep_above(params['value'])
case 'set_keep_below':
self.window.set_keep_below(params['value'])
case 'move':
self.window.move(params['value'].get('x'), params['value'].get('y'))
case 'toggle_visible':
if self.window.is_visible():
self.window.hide()
else:
self.window.present()
case 'hide':
self.window.hide()
case 'show':
self.window.present()
case 'is_visible':
output = self.window.is_visible()
return (_error, output)
def _proxy_handler(self, params = {}):
dm = self.get_website_data_manager()
output = True
_error = None
if params['action'] == 'disable':
dm.set_network_proxy_settings(WebKit2.NetworkProxyMode.NO_PROXY, None)
elif params['action'] == 'system':
dm.set_network_proxy_settings(WebKit2.NetworkProxyMode.DEFAULT, None)
else:
try:
proxy = WebKit2.NetworkProxySettings(params['url'], params['ignore'])
dm.set_network_proxy_settings(WebKit2.NetworkProxyMode.CUSTOM, proxy)
except Exception as err:
_error = err
output = False
return (_error, output)
基于webkit2封装的webview库, 提供傻瓜式的定制, 和一系列的js方法的注入, 增加前端js直接与系统交互的能力
Python 68.1%
JavaScript 30.4%
Shell 1.5%