#!/usr/bin/env python3 import gi, io, base64, threading from gi.repository import Gdk, GLib, GdkPixbuf, Gio, GObject from PIL import Image, ImageFilter empty_pixbuf = GdkPixbuf.Pixbuf.new(GdkPixbuf.Colorspace.RGB, True, 8, 1, 1) def pixbuf_to_pil(pixbuf): data = pixbuf.get_pixels() w = pixbuf.get_width() h = pixbuf.get_height() stride = pixbuf.get_rowstride() mode = "RGBA" if pixbuf.get_has_alpha() else "RGB" img = Image.frombytes(mode, (w, h), data, "raw", mode, stride) return img def pil_to_pixbuf(img): data = [] for p in img.getdata(): data.extend(p) data = bytes(data) alpha = img.mode == 'RGBA' w, h = img.size rowstride = w * 4 try: return GdkPixbuf.Pixbuf.new_from_bytes(GLib.Bytes.new(data), GdkPixbuf.Colorspace.RGB, alpha, 8, w, h, rowstride) except: return empty_pixbuf def pic_to_pixbuf(pic): data = pic.data input_stream = Gio.MemoryInputStream.new_from_data(data, None) try: return GdkPixbuf.Pixbuf.new_from_stream(input_stream, None) except: return empty_pixbuf def base64_to_pixbuf(base64_str): data = base64.b64decode(base64_str) input_stream = Gio.MemoryInputStream.new_from_data(data, None) try: return GdkPixbuf.Pixbuf.new_from_stream(input_stream, None) except: return empty_pixbuf def blur_image(pixbuf): # 加载图片并确认该图片为RGBA模式,保证透明度 img = pixbuf_to_pil(pixbuf).convert('RGBA') mask = Image.new('RGBA', img.size, (64, 64, 64, 160)) img = img.filter(ImageFilter.GaussianBlur(radius = 32)) img.alpha_composite(mask) return pil_to_pixbuf(img) # 定义一个异步修饰器, 用于在子线程中运行一些会阻塞主线程的任务 def run_async(func): def wrapper(*args, **kwargs): thread = threading.Thread(target=func, args=args, kwargs=kwargs) thread.daemon = True thread.start() return thread return wrapper # 类型js的settimeout的修饰器 def set_timeout(timeout = 0.5): def decorator(callback): def wrapper(*args): t = threading.Timer(timeout, callback, args=args) t.start() return t return wrapper return decorator # 定义一个修饰器, 用于将当前方法转到主线程中运行 (子线程中调用方法时) def idle(func): def wrapper(*args): GObject.idle_add(func, *args) return wrapper