1. 基本概念
1.1 GIL 全局解释器锁
在提到进程、线程和协程前,不得不提下GIL(Global Interpreter Lock),全局解释器锁。
GIL是一个互斥锁(mutex),是 CPython(Python 解释器)限制了同一时间内,一个进程里只能有一个线程运行。它阻止了 多个线程同时执行 Python 字节码,毫无疑问,这降低了执行效率。
Python 最初的设计理念在于,为了解决多线程之间数据完整性和状态同步的问题,设计为在任意时刻只有一个线程在解释器中运行。而当执行多线程程序时,由 GIL 来控制同一时刻只有一个线程能够运行。即 Python 中的多线程是表面多线程,也可以理解为‘假’多线程,不是真正的多线程。
为什么要这样做呢?举个例子,比如用 python 计算:n=n+1。这个操作被分成了四步:
- 加载全局变量 n
- 加载常数 1
- 进行二进制加法运算
- 将运算结果存入变量 n
以上的过程是非原子操作的,根据前面的线程释放 GIL 锁原则,线程 a 执行这四步的过程中,有可能会让出 GIL。如果这样,n=n+1 的运算过程就被打乱了。
这就是为什么我们说 GIL 是粗粒度的,它只保证了一定程度的安全。如果要做到线程的绝对安全,是不是所有的非 IO 操作,我们都需要自己再加一把锁呢?答案是否定的。在 python 中,有些操作是是原子级的,它本身就是一个字节码,GIL 无法在执行过程中释放。对于这种原子级的方法操作,我们无需担心它的安全。比如 sort 方法,[1,4,2].sort(),翻译成字节码就是 CALL METHOD 0。只有一行,无法再分,所以它是线程安全的。
同一时刻只有一个线程能够运行,那么是怎么执行多线程程序的呢?其实原理很简单:解释器的 分时复用 。即多个线程的代码, 轮流 被解释器 执行 ,只不过切换的很频繁很快,给人一种多线程“同时”在执行的错觉。聊的学术化一点,其实就是“ 并发”。
“并发”和“并行”:
- 并发:不同的代码块交替执行
- 并行:不同的代码块同时执行
GIL 锁最终是保证 Python 解释器中原子操作的线程安全。
GIL 是怎么起作用的:
- 由于 GIL 的机制,单核 CPU 在同一时刻只有一个线程在运行,当线程遇到 IO(读写)操作或 Timer Tick 到期,释放 GIL 锁。其他的两个线程去竞争这把锁,得到锁之后,才开始运行。
- 线程释放 GIL 锁有两种情况,一是遇到 IO 操作,二是 Time Tick 到期(执行完 100 个字节码指令或者 15ms)。IO 操作很好理解,比如发出一个 http 请求,等待响应。而 Time Tick 规定了线程的最长执行时间,超过时间后自动释放 GIL 锁。
在多核 CPU 下,由于 GIL 锁的全局特性,无法发挥多核的特性,GIL 锁会使得多线程任务的效率大大降低。线程 1(Thread1)在 CPU1 上运行,线程 2(Thread2)在 CPU2 上运行。GIL 是全局的,CPU2 上的 Thread2 需要等待 CPU1 上的 Thread1 让出 GIL 锁,才有可能执行。如果在多次竞争中,Thread1 都胜出,Thread2 没有得到 GIL 锁,意味着 CPU2 一直是闲置的,无法发挥多核的优势。为了避免同一线程霸占 CPU,在 python3.x 中,线程会自动的调整自己的优先级,使得多线程任务执行效率更高。
GIL 的优缺点:
GIL 的优点是显而易见的,GIL 可以保证我们在多线程编程时,无需考虑多线程之间数据完整性和状态同步的问题。
GIL 缺点是:我们的多线程程序执行起来是“并发”,而不是“并行”。因此执行效率会很低,会不如单线程的执行效率。
原子操作:
- 原子操作就是不会因为进程并发或者线程并发而导致被中断的操作。原子操作 的特点就是 要么一次全部执行,要么全不执行。不存在执行了一半而被中断的情况。
Python 解释器:
- python 解释器是有多个版本的:CPython, Jpython 等。CPython 就是用 C 语言实现 Python 解释器,JPython 是用 Java 实现 Python 解释器。那么 GIL 的问题实际上是存在于 CPython 中的。
最初是为了利用多核,Python 开始支持多线程。而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。后来发现这种‘加锁’是低效的。但 当大家试图去拆分和去除 GIL 的时候,发现大量库代码开发者已经重度依赖 GIL 而非常难以去除了。
在 Python 编程中,如果想利用计算机的多核提高程序执行效率,用多进程代替多线程。
使用多进程的好处:完全并行,无 GIL 的限制,可充分利用多 cpu 多核的环境。
虽说一般使用多进程对电脑系统资源占用比较多,但是在类 unix 系统中,创建线程的开销并不比进程小,因此在并发操作时,多线程的效率还是受到了很大制约的。所以后来人们发现通过 yield 来中断代码片段的执行,同时交出了 cpu 的使用权,于是协程的概念产生了。
1.2 进程、线程与协程
进程(process)是系统资源分配的最小单位,线程(thread)是程序执行的最小单位。
而协程(Coroutine)不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行)。

一个程序(进程)在计算机上运行时,操作系统会以进程为单位,分配系统资源(CPU 时间片、内存等资源),当这个进程存在多个线程时,由于 GIL 锁,系统资源的红箭头会随机指向其中一个进程,供其使用。遇到 IO 操作或者 Time Tick 到期(执行完 100 个字节码指令或者 15ms),该线程被设置成睡眠状态,红箭头就又会随机重新指向其中一个线程执行(按优先级),这就是多线程。
协程的概念应该是从进程和线程演变而来的,协程其实并不真正存在,它只是人为设想的一种产物,由程序或用户可随意切换执行。协程在子程序内部是可中断的,然后转而执行别的子程序,在适当的时候再返回来接着执行。
协程的特点在一个线程中执行,那和多线程比,协程有何优势?
- 极高的执行效率 :因为 子程序切换不是线程切换,而是由程序自身控制 ,因此, 没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显;
- 不需要多线程的锁机制 :因为只有一个线程,也不存在同时写变量冲突, 在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
当你程序中方法需要等待时间的话,就可以用协程,效率高,消耗资源少。
python 可以通过 yield/send 的方式实现 协程 。以此有程序员 控制函数的中断与执行。
在 Python3.4 正式引入了协程的概念,代码示例如下:
import asyncio
# Borrowed from http://curio.readthedocs.org/en/latest/tutorial.html.
@asyncio.coroutine
def countdown(number, n):
while n > 0:
print('T-minus', n, '({})'.format(number))
yield from asyncio.sleep(1)
n -= 1
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
示例显示了在 Python3.4 引入两个重要概念 协程 和事件循环 。
通过修饰符 @asyncio.coroutine 定义了一个协程,而通过 event loop 来执行 tasks 中所有的协程任务。
之后在 Python3.5 引入了新的 async & await 语法,从而有了原生协程的概念。
1.3 如何选择多进程或多线程
1.3.1 根据应用场景选择
- 多进程,常用于任务的并行处理,适合 CPU 密集型任务,能够实现真正的并发。
- 多线程,也可用于任务的并发处理, 多个任务在同一时间段内交替执行,看起来“同时进行”。但是无法充分利多核的优势,因为 GIL 锁的存在,无法真正并行,但在 I/O 操作时会释放 GIL,因此线程仍然适 I/O 密集型任务
1.3.2 多进程池工具选择对比
multiprocessing 模块和 concurrent.futures.ProcessPoolExecutor 都是 Python 中用于实现 多进程并行计算 的工具,适用于 CPU 密集型任务(如科学计算、图像处理、大数据分析等),以绕过 CPython 的 GIL(全局解释器锁) 限制。但它们在接口设计、易用性、功能抽象层级等方面有明显区别。
| 特性 | multiprocessing.Pool |
ProcessPoolExecutor |
|---|---|---|
| 所属模块 | multiprocessing |
concurrent.futures |
| 接口风格 | 传统、底层 | 现代、高级(PEP 3148) |
| 返回值类型 | 直接结果或 AsyncResult |
统一返回 Future 对象 |
| 任务提交方式 | apply, apply_async, map, imap 等 |
submit, map |
| 结果获取 | 多种方式(阻塞 / 非阻塞) | 通过 Future.result() |
| 回调支持 | apply_async(callback=...) |
Future.add_done_callback() |
| 上下文管理 | 需手动 close() + join() |
支持 with 语句自动管理 |
| 异常处理 | 较复杂(需在回调或 get 中捕获) | 更统一(通过 Future.exception()) |
| 可组合性 | 较弱 | 与 as_completed, wait 等工具无缝集成 |
| 推荐程度 | 旧项目或特殊需求 | ✅ 新项目首选 |
2 多进程的实现
multiprocessing 是 Python 的标准模块,它既可以用来编写多进程,也可以用来编写多线程。
其中 Process 类是创建和管理子进程的核心工具。它可以让任务在 独立的进程 中运行,充分利用多核 CPU 的性能(并行)。
2.1 基本实例
import multiprocessing #导入多进程库
import time
def upload():
print(" 开始上传文件...")
time.sleep(5)
print(" 完成上传文件...")
def download():
print(" 开始下载文件...")
time.sleep(2)
print(" 完成下载文件...")
def main():
start=time.time()
### 同时开启两个子进程
multiprocessing.Process(target=upload).start()
multiprocessing.Process(target=download).start()
end=time.time()
print(' 主函数总耗时:%s'%(end-start))
if __name__ == '__main__':
begin = time.time()
main()
stop=time.time()
print(' 主程序总耗时:%s'%(stop-begin))
#输出结果:
主函数总耗时:0.02892470359802246
主程序总耗时:0.02892470359802246
开始上传文件...
开始下载文件...
完成下载文件...
完成上传文件...
这一个程序相当于有三个进程,该程序的这个 主进程 加上两个 子进程 upload 和 download。
upload 和 download 两个子进程是独立于主进程的,各自【新开】进程独立运行,不影响主进程。
2.2 核心 process 功能详解
# 构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组
target: 要执行的方法
name: 进程名
args/kwargs: 要传入方法的参数
# 实例方法:
is_alive():返回进程是否在运行,bool 类型。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的 timeout(可选参数)。
start():进程准备就绪,等待 CPU 调度
run():strat()调用 run 方法,如果实例进程时未制定传入 target,这 star 执行 t 默认 run()方法。
terminate():不管任务是否完成,立即停止工作进程
# 属性:
daemon:和线程的 setDeamon 功能一样
name:进程名字
pid:进程号 2.3 多进程之间的通讯
Process 之间肯定是需要通信的。Python 的 multiprocessing 模块包装了底层的机制,提供了 Queue、Pipes 等多种方式来交换数据。
- Queue队列是一种经常用于在多个进程之间传递数据的机制。它基于先进先出(FIFO)的原则,可以安全地在多个进程之间共享数据。队列适用于 大规模的数据传输,例如多个进程同时往队列中放入或取出数据。
- Pipe管道是一种用于在两个进程之间传递数据的机制。它由两个连接的端点组成,分别是发送端和接收端。通过创建管道对象,我们可以在进程之间传递数据。。管道适用于 小规模的数据传输,例如传递字符串、字节数据或数据对象。
2.3.1 Queue 队列通讯
我们以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据:
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 1 父进程创建 Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 2 启动子进程 pw,写入:
pw.start()
# 3 启动子进程 pr,读取:
pr.start()
# 4 等待 pw 结束:
pw.join()
# 5 pr 进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
用 put() 方法往队列中放入数据,用 get() 方法从队列中取出数据。
队列的主要特点包括:
- 支持多个进程之间的数据共享;
- 使用先进先出(FIFO)的原则;
- 适用于大规模的数据传输。
2.3.2 Pipe 管道通讯
from multiprocessing import Process, Pipe
def sender(conn):
conn.send("Hello from sender process!")
conn.close()
def receiver(conn):
message = conn.recv()
print("Received message:", message)
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe() #关键点,pipe 实例化生成一个双向管
sender_process = Process(target=sender, args=(child_conn,))
receiver_process = Process(target=receiver, args=(parent_conn,))
sender_process.start()
receiver_process.start()
sender_process.join()
receiver_process.join()
通过调用其 recv() 和send(message)方法在两个进程之间传递数据
管道的主要特点包括:
- 在两个进程之间传递数据;
- 是双向的,可以在父进程和子进程之间进行双向通信;
- 适用于小规模的数据传输。
2.4 进程数据共享
不同的进程通常有独立的内存空间,这使得进程间共享数据变得困难。
multiprocessing.Manager() 提供了一种方便的方式来创建可以在不同进程间共享的 共享对象(Shared Objects),例如共享的列表、字典、命名空间等。它本质上是启动了一个 管理器进程(Manager Process),其他工作进程通过 代理(Proxy) 访问这个管理器进程中的真实对象。
由于 Manager 是基于 代理 和 网络通信(Sockets) 实现的,所有对共享对象的访问都需要通过管理器进程进行序列化和反序列化(Pickling/Unpickling)以及进程间通信(IPC)。这比直接在同一进程内操作数据要慢得多,尤其是当操作频繁或数据量大时。
from multiprocessing import Process, Manager
def modify_list(shared_list):
shared_list.append(4)
def modify_dict(shared_dict):
shared_dict['key3'] = 'value3'
if __name__ == '__main__':
with Manager() as manager:
# 定义两个共享数据结构
shared_list = manager.list([1, 2, 3])
shared_dict = manager.dict({'key1': 'value1', 'key2': 'value2'})
# 开始启动子进程
p1 = Process(target=modify_list, args=(shared_list,))
p2 = Process(target=modify_dict, args=(shared_dict,))
p1.start()
p2.start()
p1.join()
p2.join()
# debug 打印输出
print(shared_list) # [1, 2, 3, 4]
print(shared_dict) # {'key1': 'value1', 'key2': 'value2', 'key3': 'value3'}
可以看到主进程定义了一个列表和一个字典,在子进程中,可以添加和修改字典的内容,在列表中插入新的数据,实现进程间的数据共享,即可以共同修改同一份数据。
2.5 multiprocessing 的进程池
2.5.1 基本介绍
当我们需要启动大量的独立任务时,并保持一定量的多任务并发,我们可以采用进程池,自动调节任务的启动。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
multiprocessing.Pool 是创建进程池的主要工具。pool = multiprocessing.Pool(10)
Pool 类有以下 4 种非常常用提交任务的方法。
- apply(func [, args [, kwargs]]):阻塞,任务其实是一个一个执行完的。无法实现并行效果。所以这个函数其实没啥用
- map(func, iterable, chunksize=None):与上一致。区别是针对多次运行同一个任务,如果只是参数不同,可以把参数做成一个迭代器。这个函数其实也没啥用
- apply_async(func [, args [, kwargs]], callback):非阻塞,且支持结果返回后进行回调
- map_async(func, iterable, chunksize, callback):与上一致,区别是针对多次运行同一个任务,如果只是参数不同,可以把参数做成一个迭代器。
进程池的管理函数:
- pool.close():关闭进程池(pool),使其不在接受新的任务。
- pool.terminal():结束工作进程,不在处理未处理的任务。
- pool.join():主进程阻塞等待子进程的退出,join 方法要在 close 或 terminate 之后使用。
2.5.2 async 异步进程池
apply_async 举例:
import os, time, random
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(random.randint(3,5))
return n**2
if __name__ == '__main__':
p=Pool(3) #创建 3 个子进程运行任务
res_l=[]
# 异步运行,根据进程池中有的进程数,每次最多 3 个子进程在异步执行
for i in range(10):
res = p.apply_async(work,args=(i,))
res_l.append(res)
p.close()
p.join() #主进程需要使用 join,等待进程池内任务都处理完,然后可以用 get 收集结果
for res in res_l:
print(res.get()) #使用 get 来获取 apply_aync 的结果
map_async 举例:
import os, time, random
from multiprocessing import Pool
def test_func(v):
print(v)
sleep(10/2-v/2) # v 越大 运行时间越少
return v
# 所需要测试的数据
data = range(10)
n_proc = 5
pool = mp.Pool(n_proc)
res = pool.map_async(test_func, data)
pool.close()
pool.join()
print(res.get())
2.5.3 async 返回值
方法 apply_async() 和map_async()的返回值是 AsyncResul 的实例 obj。实例具有以下方法:
obj.get():返回结果,如果有必要则等待结果到达。timeout 是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready():如果调用完成,返回 True
obj.successful():如果调用完成且没有引发异常,返回 True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]):等待结果变为可用。
2.5.4 回调函数
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了 I/O 的过程,直接拿到的是任务的结果。
import os, time, random
from multiprocessing import Pool
def log(v):
callback_res.append(v)
def test_func(v):
print(v)
sleep(10/2-v/2) # v 越大 运行时间越少
return v
data = range(10)
callback_res = []
n_proc = 5
pool = mp.Pool(n_proc)
for d in data:
pool.apply_async(test_func, (d,), callback=log)
pool.close()
pool.join()
print(callback_res) # [4, 3, 2, 1, 0, 6, 8, 7, 9, 5]
2.6 concurrent.futures 中的进程池
当我们需要启动大量的独立任务时,并保持一定量的多任务并发,我们可以采用进程池,自动调节任务的启动。
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
2.6.1 基本实例
concurrent.futures.ProcessPoolExecutor 是更现代的一种用来调用 进程池 的工具。
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
import time
def cpu_bound_task(n):
pid = os.getpid()
start_time = time.time()
print(f"[{time.strftime('%H:%M:%S')}] 任务 {n} 在进程 {pid} 中开始执行 ")
total = sum(i * i for i in range(n * 8000000))
end_time = time.time()
duration = end_time - start_time
print(f"[{time.strftime('%H:%M:%S')}] 任务 {n} 在进程 {pid} 中结束,耗时 {duration:.2f} 秒 ")
return f" 任务 {n} 结果: {total}"
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=3) as executor:
# 提交 5 个任务
futures = [executor.submit(cpu_bound_task, i) for i in range(5, 0, -1)] # [5,4,3,2,1]
# 按完成顺序输出结果
for future in as_completed(futures):
result = future.result()
print(f"[{time.strftime('%H:%M:%S')}] 收到结果: {result}")
#结果输出:
[18:37:53] 任务 5 在进程 15352 中开始执行
[18:37:53] 任务 4 在进程 14736 中开始执行
[18:37:53] 任务 3 在进程 15192 中开始执行
[18:37:55] 任务 3 在进程 15192 中结束,耗时 2.11 秒
[18:37:55] 任务 2 在进程 15192 中开始执行
[18:37:55] 收到结果: 任务 3 结果: 4607999712000004000000
[18:37:56] 任务 4 在进程 14736 中结束,耗时 2.91 秒
[18:37:56] 收到结果: 任务 4 结果: 10922666154666672000000
[18:37:56] 任务 1 在进程 14736 中开始执行
[18:37:56] 任务 2 在进程 15192 中结束,耗时 1.38 秒
[18:37:56] 收到结果: 任务 2 结果: 1365333205333336000000
[18:37:56] 任务 1 在进程 14736 中结束,耗时 0.70 秒
[18:37:56] 收到结果: 任务 1 结果: 170666634666668000000
[18:37:56] 任务 5 在进程 15352 中结束,耗时 3.63 秒
[18:37:56] 收到结果: 任务 5 结果: 21333332533333340000000
根据上面测试,看时间,可以看到先提交前三个任务,任务 3 结束后,再加了任务 2,保持占满池子(3 个)
ProcessPoolExecutor 用法和线程池一模一样,接口完全一致。两者都继承自同一个抽象基类 Executor,因此两者用法完全相同
使用用法可以直接参考 3.4 章节。只需改一行代码即可切换线程 / 进程。
虽然接口一样,但底层行为不同。
2.6.2 外部执行 shell 脚本的两种方法
如果我们需要使用 python 执行一个外部的 shell 命令,这个 shell 命令是 CPU 密集型的耗时任务,有两个选择:
- 使用
subprocess.run或者subprocess.Popen
上述命令介绍见python 执行外部命令 | jqiange
这里主要介绍使用进程池来执行多个外部 shell 命令。实现两种方式:
- 使用
ProcessPoolExecutor+subprocess.run - 使用
ThreadPoolExecutor+subprocess.Popen
两种方案表格对比:
| 维度 | ProcessPoolExecutor + subprocess.run |
ThreadPoolExecutor + subprocess.Popen |
|---|---|---|
| 并发控制 | ✅ 支持(通过 max_workers) |
✅ 支持 |
| 资源开销 | ❌ 高:每个 worker 是一个完整 Python 进程(内存复制、IPC 开销) | ✅ 低:线程共享内存,仅启动外部进程 |
| 启动速度 | ❌ 慢(需 fork + import 模块) | ✅ 快(线程轻量) |
| 适用任务类型 | 理论上适合 CPU 密集型,但此处不必要 | 更适合 I/O 等待型(如等待外部命令) |
| 跨平台兼容性 | ⚠️ Windows 上 ProcessPoolExecutor 使用 spawn,启动更慢 |
✅ 更一致 |
| 能否杀死子进程树 | ❌ 困难(worker 进程可能无法访问子进程 PID) | ✅ 容易(在同一线程中管理 Popen 对象) |
| 超时控制 | ✅ subprocess.run(timeout=...) 支持 |
✅ Popen.communicate(timeout=...) 支持 |
| 内存占用(并发 4 个个任务) | 高(4 个 Python worker 进程 + 4 个 cad 进程) | 低(1 个 Python 进程 + 4 个 cad 进程) |
如果使用ProcessPoolExecutor +subprocess.run 会被认为是过度设计。
cad -f test.rule是 外部二进制程序 ,它运行在 独立的操作系统进程 中。- Python 的作用仅仅是:
- 启动它(
fork/exec) - 等待它结束
- 读取输出
- 启动它(
- 这个过程 不消耗 Python 的 CPU,只是“挂起等待”,属于 I/O 等待型操作(尽管
cad自身是 CPU 密集型)。 - 因此,用线程池完全足够,且更高效。
所以,推荐使用方法 2。
2.6.3 执行 shell 脚本方法 1
import subprocess
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
import multiprocessing
# 假设你有 n 个 CPU 密集型 shell 脚本
script_paths = [
"./cpu_task1.sh",
"./cpu_task2.sh",
"./cpu_task3.sh",
"./cpu_task4.sh",
# ...
]
def run_cpu_intensive_script(script_path):
""" 在独立进程中运行 CPU 密集型 shell 脚本 """
try:
# 确保脚本可执行(可选)
os.chmod(script_path, 0o755)
result = subprocess.run([script_path], # 或 ["/bin/bash", script_path]
capture_output=True, # 自动捕获输出或异常
text=True, #返回字符串而非 bytes
check=False, # 不自动抛异常,手动处理 returncode
timeout=600 # 10 分钟超时(按需调整)
) # subprocess.run 是阻塞的
return {
"script": script_path,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr
}
except subprocess.TimeoutExpired:
return {
"script": script_path,
"returncode": -1,
"stdout": "",
"stderr": "Script timed out"
}
except Exception as e:
return {
"script": script_path,
"returncode": -2,
"stdout": "",
"stderr": str(e)
}
def main():
max_workers = 5
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
futures = [executor.submit(run_cpu_intensive_script, script) for script in script_paths]
# 按完成顺序处理结果
for future in concurrent.futures.as_completed(futures):
res = future.result()
if res["returncode"] == 0:
print(f"{res['script']} 成功完成 ")
else:
print(f"{res['script']} 失败 (返回码: {res['returncode']})")
print(f" 错误: {res['stderr'][:200]}...") # 截断长错误
if __name__ == "__main__":
main()
2.6.4 执行 shell 脚本方法 2
使用threadPoolExecutor +subprocess.Popen 执行外部 shell 的 CPU 耗时任务
import subprocess
import concurrent.futures
import time
import signal
import os
WORK_DIR = "/path/to/your/workdir" # cad 所需的工作目录
def run_cad_task(task_id):
try:
# 启动子进程
proc = subprocess.Popen(["./run_cad.sh"],
cwd=WORK_DIR,
stdout=subprocess.PIPE, #通过管道将 "./run_cad.sh" 的结果传递给 proc.stdout
stderr=subprocess.PIPE, #这两行可以用 capture_output=True 代替
text=True, #返回字符串而非 bytes
preexec_fn=os.setsid # 创建新进程组(便于后续杀死整个进程树)
) #subprocess.Popen 是非阻塞的
# 等待完成或超时
try:
stdout, stderr = proc.communicate(timeout=3600*24)
#communicate 是阻塞的,必须阻塞,否则进程池控制失效
returncode = proc.returncode
except subprocess.TimeoutExpired:
# 超时:杀死整个进程组
try:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM) # 发送终止信号 signal.SIGTERM
proc.wait(timeout=10)
except:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL) # 发送强杀信号 signal.SIGKILL
proc.wait()
return (task_id, False, f" 任务超时 ")
# 检查结果
if returncode == 0:
return (task_id, True, " 成功完成 ")
else:
return (task_id, False, f" 失败,退出码: {returncode}\nstderr: {stderr[-500:]}") # 截断长错误
except Exception as e:
return (task_id, False, f" 异常: {str(e)}")
def main():
task_ids = [f"task_{i}" for i in range(10)] # 示例:10 个任务
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务
future_to_task = {
executor.submit(run_cad_task, tid): tid
for tid in task_ids
}
# 按完成顺序处理结果
for future in concurrent.futures.as_completed(future_to_task):
task_id, success, msg = future.result()
if success:
print(f"[{task_id}] {msg}")
else:
print(f"[{task_id}] {msg}")
if __name__ == "__main__":
main()
注意:在多线程任务
run_cad_task里面,必须阻塞,否则线程池会认为它立马执行结束,会再加新的任务,失去了控制并发数为 5 的初衷
这个方法考虑了从 subprocess.Popen 里面传递 log,如果不需要传递 log,只管提交,不等待,通过其他方法监控任务,可以采用:
import subprocess
import concurrent.futures
import time
import signal
import os
WORK_DIR = "/path/to/your/workdir" # cad 所需的工作目录
def run_cad_task(task_cmd):
proc = subprocess.Popen(
task_cmd, # 启动子进程 task_cmd
shell=True,
cwd=WORK_DIR,
preexec_fn=os.setsid # 创建新进程组(便于后续杀死整个进程树)
) #subprocess.Popen 是非阻塞的
proc.wait() # run_cad_task 函数必须阻塞,否则多进程失效
def submit_all_jobs(task_cmds):
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
future_to_task = {
executor.submit(run_cad_task, task): task
for task in task_cmds
}
print("all job are submited")
executor.shutdown(wait=False)
def main():
task_cmds = [...] # 示例:多个任务
submit_all_jobs(task_cmds)
if __name__ == "__main__":
main()
2.6.5 with 结构的坑
很多时候我们只想提交任务,不需要等到完成进行后续的操作,或者自有其他手段监控 log。
def submit_all_jobs(task_cmds):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有任务
future_to_task = {
executor.submit(run_cad_task, task): task
for task in task_cmds
}
print("all job are submited")
##=================
### 使用 with 结构会发现,只有所有任务完成后才会打印 "all job are submited"
##=================
在使用 ThreadPoolExecutor 的 with 结构,只有当池子里面所有的任务全部完成后,才会退出 with 结构!!!
with ThreadPoolExecutor(...) as executor: 的作用域
with块结束时,会调用executor.shutdown(wait=True)(默认);- 这会等待所有已提交的任务(即
run_cad_task函数)完成; - 它会一直等到所有的子线程
run_cad_task结束才会退出 with 语句;
所以,下面采用不使用 with 结构:只管提交,不等待
def submit_all_jobs(task_cmds):
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
future_to_task = {
executor.submit(run_cad_task, task): task
for task in task_cmds
}
print("all job are submited")
executor.shutdown(wait=False)
所以我们不采用 with 结构,只管提交任务,打印”all job are submited”,而后通过其他手段监控任务。
仔细一想:如果 submit_all_jobs 瞬间执行结束,退出这个函数后,多线程对象 executor 还存在吗,队列还存在吗,会被销毁吗?
下节进行详细的介绍。
2.6.6 线程生命周期和子进程的独立性
几点结论:
已经由
subprocess.Popen启动的任务不受其他影响,进程仍然会继续在操作系统中运行。- 因为它们是独立的 Linux 进程,不依赖于 Python 线程或 executor 的存在。
executor = ThreadPoolExecutor只要还有工作线程在排队,其就不会被垃圾回收。- submit 是非阻塞式的,会把所有任务在瞬间提交,无论线程池是否忙碌
- 所有任务进入待执行队列。这个队列由 self._work_queue 管理,这个队列是 executor 实例的私有属性。
- 只要还有工作线程在运行(因为队列非空),executor 对象就不会被 GC(因为工作线程持有对它的弱引用或队列引用)。
- executor 的生命周期由内部任务和线程决定,而非局部变量作用域!
示例:
from concurrent.futures import ThreadPoolExecutor
import os
import time
def cpu_bound_task(n):
pid = os.getpid()
start_time = time.time()
print(f"[{time.strftime('%H:%M:%S')}] 任务 {n} 在进程 {pid} 中开始执行 ")
total = sum(i * i for i in range(n * 8000000)) # 缩小规模便于观察
end_time = time.time()
duration = end_time - start_time
print(f"[{time.strftime('%H:%M:%S')}] 任务 {n} 在进程 {pid} 中结束,耗时 {duration:.2f} 秒 ")
return f" 任务 {n} 结果: {total}"
def submit_all_jobs(taskids):
executor = ThreadPoolExecutor(max_workers=3)
# 提交 5 个任务
futures = [executor.submit(cpu_bound_task, i) for i in taskids] # [5,4,3,2,1]
print(" 所有任务已提交,等待结果...")
if __name__ == '__main__':
tskaids = [5,4,3,2,1]
submit_all_jobs(tskaids)
#输出
[15:24:31] 任务 5 在进程 9376 中开始执行
[15:24:31] 任务 4 在进程 9376 中开始执行
[15:24:32] 任务 3 在进程 9376 中开始执行
所有任务已提交,等待结果...
[15:24:37] 任务 3 在进程 9376 中结束,耗时 5.65 秒
[15:24:37] 任务 2 在进程 9376 中开始执行
[15:24:40] 任务 5 在进程 9376 中结束,耗时 8.59 秒
[15:24:40] 任务 1 在进程 9376 中开始执行
[15:24:41] 任务 4 在进程 9376 中结束,耗时 9.27 秒
[15:24:41] 任务 2 在进程 9376 中结束,耗时 3.42 秒
[15:24:41] 任务 1 在进程 9376 中结束,耗时 1.08 秒
2.6.6 在 GUI 中并发外部任务
在使用 GUI 提交并发任务(任务是外部 linux 命令)时,建议在 QThread 中维持线程 / 进程池任务,不要交给外部函数,不然会有一定的风险和不便之处。
import ...
class TaskRunner(QThread):
log_signal = pyqtSignal(str) # 用于 GUI 日志输出
def __init__(self):
super().__init__()
#...
def run_task(self, task):
proc = subprocess.Popen(
cmd,
shell=True,
cwd=str(self.work_dir)
)
proc.wait()
self.log_signal.emit(f"{cmd} finshed")
def run(self):
self.log_signal.emit("xxx")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self.run_task, cmd)
for cmd in self.tasks
]
# 可选:等待所有完成(但 GUI 不会卡,因为在线程中)
for future in futures:
future.result() # 确保异常能被抛出(可选)
### 这部分可要可不要
self.log_signal.emit("All tasks submitted and completed!")
class MainWindow(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("Long-Running Task Launcher")
self.resize(600, 400)
#...
self.runner = None
#def...
def start_tasks(self):
if self.runner and self.runner.isRunning():
self.append_log("⚠️ A task is already running!")
return
self.runner = TaskRunner(tasks, max_works, log_dir)
self.runner.log_signal.connect(self.show_log)
self.runner.start() #会执行里面的 run 函数
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec_())
上述例子中,self.runner = TaskRunner 维持了一个 TaskRunner 实例对象,由 TaskRunner 控制任务的启动
TaskRunner 的 run 函数会被 with…as executor 结构所阻塞
2.7 多进程进度条
采用 tqdm 生成进度条,然后使用 labmda 操作符,将其转换为函数,然后作为回调对象。那么进程池每处理完一个任务,就会调用一次 tqdm 的 update()。从而实现多进程的进度条显示。
from tqdm import tqdm
import multiprocessing as mp
from time import sleep
def test_func(v):
sleep(10/2-v/2) # v 越大 运行时间越少
return v
if __name__ == '__main__':
data = range(10)
pbar = tqdm(total=len(data), position=0, leave=True) #total 需要知道总数量才能计算进度百分比
pbar.set_description('Sleep')
update = lambda *args: pbar.update()
n_proc = 5
pool = mp.Pool(n_proc)
for d in data:
pool.apply_async(test_func, (d,), callback=update)
pool.close()
pool.join()
pbar.close()
3. 多线程的实现
当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元。
当设置多线程时,主线程会创建多个子线程,在 python 中,主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束。
不同线程共享同一个全局解释器锁(GIL),这意味着所有线程共享相同的全局命名空间。
Python 有一个全局解释器锁(GIL),它确保同一时间只有一个线程执行 Python 字节码, 并且所有线程共享相同的全局命名空间。
这意味着 线程在 CPU 密集型任务中并不能真正实现并行,但在 I/O 操作时会释放 GIL,因此线程仍然适 I/O 密集型任务
Python 的标准库提供了 threading 模块,来实现多线程编程。
3.1 基本实例
import threading
import time
def worker(name):
""" 工作函数 """
for i in range(3):
print(f" 线程 {name} 正在工作: {i}")
time.sleep(1)
# 创建线程
thread1 = threading.Thread(target=worker, args=("A",))
thread2 = threading.Thread(target=worker, args=("B",))
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join() # join() 阻塞当前线程,直到该线程结束(可设超时)
thread2.join()
print(" 所有线程完成 ")
# 输出
线程 A 正在工作: 0
线程 B 正在工作: 0
线程 B 正在工作: 1
线程 A 正在工作: 1
线程 A 正在工作: 2
线程 B 正在工作: 2
所有线程完成
3.2 核心 Thread 功能
# 构造方法
Thread(
group=None, # 保留参数,必须为 None
target=None, # 线程要执行的函数
name=None, # 线程名称(自动生成如 Thread-1)
args=(), # 传递给 target 的位置参数
kwargs={}, # 传递给 target 的关键字参数
daemon=None # 是否为守护线程(默认继承主线程)
)
# 实例方法
start() # 启动线程(调用后会执行 run())
join(timeout=None) # 阻塞当前线程,直到该线程结束(可设超时)
is_alive() # 判断线程是否仍在运行
getName() / setName() # 获取 / 设置线程名(现推荐用 name 属性)
daemon # 属性,设置或获取是否为守护线程 3.3 锁
当多个线程访问共享资源时,需要使用锁来避免竞态条件:
import threading
# 创建锁
lock = threading.Lock()
counter = 0
def increment():
global counter
for _ in range(100000):
with lock: # 使用锁保护共享资源
counter += 1
# 创建多个线程
threads = []
for i in range(5):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print(f" 最终计数: {counter}") #最终计数: 500000
3.4 concurrent.futures 中的线程池
3.4.1 基本介绍
采用 concurrent.futures 模块。池的好处是,对于多个线程,它能进行合理调控,比如可以设置同时只能进行 5 个任务,避免 python 占用过多的电脑资源,当运行中的 5 个任务完成了其中 3 个,池会自动进行补调,保证同时 5 个任务的进行。
线程池是一种高效管理和复用线程的工具,特别适用于 IO 密集型任务(如网络请求、文件读写)
from concurrent.futures import ThreadPoolExecutor
def download_one_page(url, name):
print('{} 对应的 {} 下载完毕 '.format(url,name))
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=3) as executor:
for url, name in result: # 每一次循环都启动一个新的线程
host_url = 'http://www.shuquge.com/txt/8659/'
executor.map(download_one_page, host_url + url, name)
3.4.2 核心方法 ThreadPoolExecutor
ThreadPoolExecutor(max_workers=None, # 最大线程数(默认 min(32, (os.cpu_count() or 1) + 4))
thread_name_prefix="", # 线程名前缀(便于调试)
initializer=None, # 每个工作线程启动时调用的初始化函数
initargs=() # 传递给 initializer 的参数)执行器 ThreadPoolExecutor 的方法:
submit(fn, *args, **kwargs)
- 提交一个可调用对象(函数)到线程池。
- 立即返回 一个
Future对象(不阻塞)。 - 适合不同函数或不同参数组合的任务。
map(fn, *iterables, timeout=None, chunksize=1)
- 将
fn应用于iterables中的每个元素。 - 返回一个 结果迭代器(惰性求值)。结果在遍历时才逐个获取。不直接返回
Future,无法单独设置回调 - 默认按输入顺序返回结果(即使任务完成顺序不同)。
shutdown(wait=True)
- 关闭线程池,不再接受新任务。
- 若
wait=True(默认),则等待所有已提交任务完成。 - 使用
with语句时会自动调用。
3.4.3 submit 示例
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
import time
def task(name, delay):
time.sleep(delay)
return f"{name} 完成,耗时 {delay}s"
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = [executor.submit(task, "A", 2),
executor.submit(task, "B", 1),
executor.submit(task, "C", 3)
]
# 按完成顺序获取结果(as_completed)
for future in as_completed(futures):
print(future.result())
### 输出顺序可能是 B → A → C(按完成时间),因为 as_completed 返回的是最先完成的任务。
- 每次调用返回一个
Future对象 - 可以混合不同函数、不同参数
- 需手动收集
Future并调用.result() - 支持
add_done_callback回调 - 结果默认 不保序(除非按原列表顺序遍历)
submit() 返回的 Future 对象代表一个 异步计算的结果,提供以下方法:
| 方法 | 说明 |
|---|---|
result(timeout=None) |
获取结果(阻塞,可设超时) |
exception(timeout=None) |
获取任务抛出的异常(若有) |
done() |
判断任务是否完成(True/False) |
add_done_callback(fn) |
注册回调函数(任务完成后自动调用) |
3.4.4 map 示例
from concurrent.futures import ThreadPoolExecutor
import time
def task(name, delay):
time.sleep(delay)
return f"{name} 完成,耗时 {delay}s"
with ThreadPoolExecutor(max_workers=3) as executor:
names = ["A", "B", "C"]
delays = [2, 1, 3]
results = executor.map(task, names, delays)
# 按输入顺序输出结果(即使 C 先完成,也会等 A、B 按序输出)
for res in results:
print(res)
- 所有任务调用 同一个函数
- 输入是 可迭代对象(如列表、元组),自动打包参数并分发任务
- 希望结果 按输入顺序返回,结果在遍历时才逐个获取。不直接返回
Future,无法单独设置回调
3.4.5 高级功能
- 按完成顺序处理任务:
as_completed
from concurrent.futures import as_completed
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
futures = [executor.submit(time.sleep, t) or t for t in [3, 1, 2]]
# 注意:上面写法不严谨,仅示意;实际应封装函数
# 谁先完成就先处理
for future in as_completed(futures):
print(f" 任务完成: {future.result()}")
- 等待一组任务:
wait
from concurrent.futures import wait, FIRST_COMPLETED
futures = [executor.submit(task, i) for i in range(5)]
# 等待第一个任务完成
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
print(f" 已完成: {len(done)}, 未完成: {len(not_done)}")
- 回调函数:
add_done_callback
from concurrent.futures import ThreadPoolExecutor, wait
import time
import random
# 模拟一个耗时任务(I/O 密集型)
def task(task_id):
# ...
return result
# 回调函数:接收一个 Future 对象作为参数
def callback(future):
try:
result = future.result() # 获取任务结果
print(f" 回调函数收到结果: {result}")
except Exception as exc:
print(f" 任务发生异常: {exc}")
# 主程序
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=3) as executor:
futures = []
for i in range(5):
future = executor.submit(task, i)
future.add_done_callback(callback) # 注册回调, 一旦任务完成,就会自动回调
futures.append(future)
# 可选:等待所有任务完成(其实 with 语句已隐式等待)
wait(futures)
4. 协程示例
async/await 关键字是出现在 python3.5 版本中的新功能,是一种关于 协程 的语法糖。从此 python 就正式有了原生协程的概念。
正常的函数在执行时是不会中断的,所以你要写一个能够中断的函数,就需要添加 async 关键词。
async 用来声明一个函数为异步函数,异步函数的特点是能在函数执行过程中挂起,去执行其他异步函数,等到挂起条件消失后,再回来执行。
await 用来声明程序挂起,比如异步程序执行到某一步时需要等待的时间很长,就将此挂起,去执行其他的异步程序,await 后面只能跟异步程序函数或有 __await__ 属性的对象,也就是说 await 表达式中的对象必须是 awaitable 的。
awaitable 对象必须满足如下条件中其中之一:
- 原生协程对象
- types.coroutine()修饰的 基于生成器的协程对象
- 实现了 await method,并在其中返回了iterator 的对象(可迭代对象)
举例一:
def main():
//1、定义异步函数
async def funcA(): #声明 funcA 为一个异步函数(或者叫协程函数)
await asyncio.sleep(4)
print('A 函数执行完毕 ')
async def funcB(): #定义的协程函数就是原生协程对象
await asyncio.sleep(2)
print('B 函数执行完毕 ')
async def funcD():
await asyncio.sleep(8)
print('D 函数执行完毕 ')
//2、创建一个事件循环
loop = asyncio.get_event_loop()
//3、将异步函数加入事件队列
tasks=[funcA(),funcB(),funcD()]
//4、执行事件队列, 直到最晚的一个事件被处理完毕后结束
loop.run_until_complete(asyncio.wait(tasks))
//5、如果不再使用 loop, 建议养成良好关闭的习惯
loop.close()
if __name__=='__main__':
start=time.time()
main()
end=time.time()
print(' 总耗时为:'+ str(end-start))
#输出结果为:
B 函数执行完毕
A 函数执行完毕
D 函数执行完毕
总耗时为:8.006016969680786s
举例二:
import asyncio
import requests
import time
async def download(url):
print("get %s" % url)
response = requests.get(url)
print(response.status_code)
async def wait_download(url):
await download(url) # 这里 download(url)就是一个原生的协程对象
print("get {} data complete.".format(url))
async def main():
start = time.time()
await asyncio.wait([wait_download("http://www.163.com"),
wait_download("http://www.mi.com"),
wait_download("http://www.baidu.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
#运行结果:
get http://www.163.com
200
get http://www.163.com data complete.
get http://www.baidu.com
200
get http://www.baidu.com data complete.
get http://www.mi.com
200
get http://www.mi.com data complete.
Complete in 0.49027466773986816 seconds
程序可以运行,不过仍然有一个问题就是:它并没有真正地异步执行。
这里程序始终是同步执行的,这就说明仅仅是把涉及 I/O 操作的代码封装到 async 当中是不能实现异步执行的。必须使用支持异步操作的非阻塞代码才能实现真正的异步。目前支持非阻塞异步 I/O 的库是 aiohttp。
import asyncio
import aiohttp
import time
async def download(url): # 通过 async def 定义的函数是原生的协程对象
print("get: %s" % url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.status)
# response = await resp.read()
# 此处的封装不再需要
# async def wait_download(url):
# await download(url)
# print("get {} data complete.".format(url))
async def main():
start = time.time()
await asyncio.wait([download("http://www.163.com"),
download("http://www.mi.com"),
download("http://www.baidu.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
#测试结果:
get: http://www.mi.com
get: http://www.163.com
get: http://www.baidu.com
200
200
200
Complete in 0.27292490005493164 seconds
可以看出这次是真正的异步了。
欢迎各位看官及技术大佬前来交流指导呀,可以邮件至 jqiange@yeah.net