xshot/draft/utils.py

91 lines
2.3 KiB
Python
Raw Permalink Normal View History

2023-08-30 19:22:53 +08:00
#!/usr/bin/env python3
import gi, io, base64, threading
from gi.repository import Gdk, GLib, GdkPixbuf, Gio, GObject
from PIL import Image, ImageFilter
empty_pixbuf = GdkPixbuf.Pixbuf.new(GdkPixbuf.Colorspace.RGB, True, 8, 1, 1)
def pixbuf_to_pil(pixbuf):
data = pixbuf.get_pixels()
w = pixbuf.get_width()
h = pixbuf.get_height()
stride = pixbuf.get_rowstride()
mode = "RGBA" if pixbuf.get_has_alpha() else "RGB"
img = Image.frombytes(mode, (w, h), data, "raw", mode, stride)
return img
def pil_to_pixbuf(img):
data = []
for p in img.getdata():
data.extend(p)
data = bytes(data)
alpha = img.mode == 'RGBA'
w, h = img.size
rowstride = w * 4
try:
return GdkPixbuf.Pixbuf.new_from_bytes(GLib.Bytes.new(data), GdkPixbuf.Colorspace.RGB, alpha, 8, w, h, rowstride)
except:
return empty_pixbuf
def pic_to_pixbuf(pic):
data = pic.data
input_stream = Gio.MemoryInputStream.new_from_data(data, None)
try:
return GdkPixbuf.Pixbuf.new_from_stream(input_stream, None)
except:
return empty_pixbuf
def base64_to_pixbuf(base64_str):
data = base64.b64decode(base64_str)
input_stream = Gio.MemoryInputStream.new_from_data(data, None)
try:
return GdkPixbuf.Pixbuf.new_from_stream(input_stream, None)
except:
return empty_pixbuf
def blur_image(pixbuf):
# 加载图片并确认该图片为RGBA模式保证透明度
img = pixbuf_to_pil(pixbuf).convert('RGBA')
mask = Image.new('RGBA', img.size, (64, 64, 64, 0))
img = img.filter(ImageFilter.GaussianBlur(radius = 6))
img.alpha_composite(mask)
return pil_to_pixbuf(img)
# 定义一个异步修饰器, 用于在子线程中运行一些会阻塞主线程的任务
def run_async(func):
def wrapper(*args, **kwargs):
thread = threading.Thread(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread
return wrapper
# 类型js的settimeout的修饰器
def set_timeout(timeout = 0.5):
def decorator(callback):
def wrapper(*args):
t = threading.Timer(timeout, callback, args=args)
t.start()
return t
return wrapper
return decorator
# 定义一个修饰器, 用于将当前方法转到主线程中运行 (子线程中调用方法时)
def idle(func):
def wrapper(*args):
GObject.idle_add(func, *args)
return wrapper