sonist-gtk/usr/lib/sonist/utils.py

91 lines
2.3 KiB
Python
Raw Normal View History

2023-08-17 20:51:59 +08:00
#!/usr/bin/env python3
2023-08-24 17:04:41 +08:00
import gi, io, base64, threading
from gi.repository import Gdk, GLib, GdkPixbuf, Gio, GObject
2023-08-17 20:51:59 +08:00
from PIL import Image, ImageFilter
2023-08-24 17:04:41 +08:00
empty_pixbuf = GdkPixbuf.Pixbuf.new(GdkPixbuf.Colorspace.RGB, True, 8, 1, 1)
2023-08-17 20:51:59 +08:00
def pixbuf_to_pil(pixbuf):
data = pixbuf.get_pixels()
w = pixbuf.get_width()
h = pixbuf.get_height()
stride = pixbuf.get_rowstride()
mode = "RGBA" if pixbuf.get_has_alpha() else "RGB"
img = Image.frombytes(mode, (w, h), data, "raw", mode, stride)
return img
def pil_to_pixbuf(img):
data = []
for p in img.getdata():
data.extend(p)
data = bytes(data)
alpha = img.mode == 'RGBA'
w, h = img.size
rowstride = w * 4
2023-08-24 17:04:41 +08:00
try:
return GdkPixbuf.Pixbuf.new_from_bytes(GLib.Bytes.new(data), GdkPixbuf.Colorspace.RGB, alpha, 8, w, h, rowstride)
except:
return empty_pixbuf
2023-08-17 20:51:59 +08:00
def pic_to_pixbuf(pic):
data = pic.data
input_stream = Gio.MemoryInputStream.new_from_data(data, None)
2023-08-24 17:04:41 +08:00
try:
return GdkPixbuf.Pixbuf.new_from_stream(input_stream, None)
except:
return empty_pixbuf
2023-08-21 19:06:22 +08:00
def base64_to_pixbuf(base64_str):
data = base64.b64decode(base64_str)
input_stream = Gio.MemoryInputStream.new_from_data(data, None)
2023-08-24 17:04:41 +08:00
try:
return GdkPixbuf.Pixbuf.new_from_stream(input_stream, None)
except:
return empty_pixbuf
2023-08-21 19:06:22 +08:00
2023-08-17 20:51:59 +08:00
def blur_image(pixbuf):
# 加载图片并确认该图片为RGBA模式保证透明度
img = pixbuf_to_pil(pixbuf).convert('RGBA')
2023-08-21 19:06:22 +08:00
mask = Image.new('RGBA', img.size, (64, 64, 64, 160))
img = img.filter(ImageFilter.GaussianBlur(radius = 32))
2023-08-17 20:51:59 +08:00
img.alpha_composite(mask)
2023-08-24 17:04:41 +08:00
return pil_to_pixbuf(img)
# 定义一个异步修饰器, 用于在子线程中运行一些会阻塞主线程的任务
def run_async(func):
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread
return wrapper
# 类型js的settimeout的修饰器
def set_timeout(timeout = 0.5):
def decorator(callback):
def wrapper(*args):
t = threading.Timer(timeout, callback, args=args)
t.start()
return t
return wrapper
return decorator
# 定义一个修饰器, 用于将当前方法转到主线程中运行 (子线程中调用方法时)
def idle(func):
def wrapper(*args):
GObject.idle_add(func, *args)
return wrapper