多线程
AScript 使用 Python 原生的多线程能力. 主要通过 threading 模块实现.
# 导包
import threading
创建线程
函数方式创建
import threading
import time
def task(name):
for i in range(3):
print(f"线程 {name}: 第{i+1}次执行")
time.sleep(1)
# 创建线程
t = threading.Thread(target=task, args=("A",))
# 启动线程
t.start()
print("主线程继续执行, 不会被阻塞")
同时启动多个线程
import threading
import time
def task(name, count):
for i in range(count):
print(f"线程 {name}: {i+1}")
time.sleep(0.5)
# 创建多个线程
t1 = threading.Thread(target=task, args=("A", 3))
t2 = threading.Thread(target=task, args=("B", 5))
t1.start()
t2.start()
print("两个线程已同时启动")
等待线程结束
使用 join() 等待线程执行完毕后再继续
import threading
import time
def download(url):
print(f"开始下载: {url}")
time.sleep(2)
print(f"下载完成: {url}")
t1 = threading.Thread(target=download, args=("文件A",))
t2 = threading.Thread(target=download, args=("文件B",))
t1.start()
t2.start()
# 等待两个线程都执行完毕
t1.join()
t2.join()
print("所有下载已完成")
带超时的等待
import threading
import time
def slow_task():
time.sleep(10)
t = threading.Thread(target=slow_task)
t.start()
# 最多等3秒
t.join(timeout=3)
if t.is_alive():
print("线程还在运行, 不再等待")
else:
print("线程已完成")
守护线程
守护线程会在主线程结束时自动退出, 适合后台任务.
import threading
import time
def background_monitor():
while True:
print("后台监控中...")
time.sleep(2)
# 设置为守护线程
t = threading.Thread(target=background_monitor, daemon=True)
t.start()
# 主线程5秒后结束, 守护线程也会自动退出
time.sleep(5)
print("主线程结束")
线程锁
多个线程同时修改共享数据时, 需要加锁防止数据错乱.
基本锁
import threading
count = 0
lock = threading.Lock()
def add():
global count
for _ in range(100000):
lock.acquire()
count += 1
lock.release()
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终结果: {count}") # 200000
使用 with 语句(推荐)
import threading
count = 0
lock = threading.Lock()
def add():
global count
for _ in range(100000):
with lock:
count += 1
t1 = threading.Thread(target=add)
t2 = threading.Thread(target=add)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"最终结果: {count}") # 200000
线程间通信
使用 Event 信号
import threading
import time
event = threading.Event()
def wait_for_signal():
print("等待信号...")
event.wait() # 阻塞直到 event 被 set
print("收到信号, 开始执行!")
def send_signal():
time.sleep(3)
print("发送信号!")
event.set()
t1 = threading.Thread(target=wait_for_signal)
t2 = threading.Thread(target=send_signal)
t1.start()
t2.start()
使用 Queue 队列
线程安全的数据传递方式, 适合生产者-消费者模式.
import threading
import queue
import time
q = queue.Queue()
def producer():
for i in range(5):
item = f"任务{i+1}"
q.put(item)
print(f"生产: {item}")
time.sleep(0.5)
q.put(None) # 结束信号
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(1)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
print("全部处理完毕")
定时器
延迟执行一个函数
import threading
def delayed_task():
print("3秒后执行了!")
# 3秒后执行 delayed_task
timer = threading.Timer(3.0, delayed_task)
timer.start()
print("定时器已设置")
取消定时器
import threading
def task():
print("这不会执行")
timer = threading.Timer(5.0, task)
timer.start()
# 取消定时器
timer.cancel()
print("定时器已取消")
周期性执行
import threading
def repeat_task(interval, func):
"""每隔 interval 秒执行一次 func"""
func()
timer = threading.Timer(interval, repeat_task, args=(interval, func))
timer.daemon = True
timer.start()
return timer
def check():
print("定期检查中...")
# 每5秒执行一次
repeat_task(5, check)
# 保持主线程运行
import time
time.sleep(30)
线程池
使用 concurrent.futures 模块管理线程池, 适合批量并发任务.