python并行操作,您了解多少?(线程池ThreadPoolExecutor能力揭秘)
python版本的并行运行,极致使用CPU
文章目录
ThreadPoolExecutor利用计算机的并发处理能力,适合等待型任务,如文件读写、网络请求。其实也是受C++并行操作的影响,如果需要C++版,可以移步C++thread并行笔记
一、理论漫谈
1.1 什么是线程池
ThreadPoolExecutor
顾名思义为线程池,我们知道Python有GIL(全局解释器锁),线程无法真正实现并行计算,只能并发处理。且适用于I/O密集型任务,内存共享,并且可用内核数为单核。
1.2 线程池是用单核
那么问题就来了,下面我们会用os.cpu_count()
获取当前计算机的核数,为什么还要获取核数呢?
这是为了用 os.cpu_count()
来计算线程数是为了 “合理地分片任务量” ,而不是真正地并行占满每个 CPU 核心。它本质上是为了最大化线程池的吞吐效率,不是为了 CPU 利用率。
为什么这样做也是合理的?
情况 1: 任务是 I/O 密集型
比如你处理的是下载网页、请求接口、读写文件——这些操作虽然阻塞 Python 线程,但CPU 实际上是空闲的。这时,多线程并发就能提高效率。此时使用 os.cpu_count()
只是一个大致的“并发度建议”,你甚至可以设置更高的 max_workers(比如 2 × 核心数 或更多),只要不炸内存或 IO。
情况 2: 任务是 轻量的 CPU 操作
如果每个任务做的计算很轻,用多线程其实也没啥问题。比如每个函数只做一些小计算和少量 I/O,此时你可以用 os.cpu_count()
来做分片是合理的。
二、快速实践
2.1 明确自身cpu可并行的核数
max_workers = os.cpu_count()
2.2 根据所有任务计算在各个核上平均跑多少任务
use_cpu_pre_task = all_task_size // max_workers
2.3 最后把任务划分在不同的核上跑
def process_function(range_rask, arg1, arg2):
for i in range(ranges.start, ranges.stop):
XXX
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(max_workers):
start_idx = i * use_cpu_pre_task
end_idx = all_task_size if i == max_workers - 1 else (i + 1) * use_cpu_pre_task
range_task = range(start_idx, end_idx)
executor.submit(process_function, range_task, arg1, arg2)
2.4 拿来主义
import concurrent.futures
def process_function(ranges, 参数1, 参数2):
for i in range(ranges.start, ranges.stop):
XXX
if __name__ == "__main__":
max_workers = os.cpu_count()
all_task_size = 所有任务数量
use_cpu_pre_task = max(all_task_size // max_workers, 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(max_workers):
start_idx = i * use_cpu_pre_task
end_idx = all_task_size if i == max_workers - 1 else (i + 1) * use_cpu_pre_task
end_idx = start_idx if start_idx >= all_task_size else end_idx
range_task = range(start_idx, end_idx)
executor.submit(process_function, range_task, 参数1, 参数2)
2.5 完善
import concurrent.futures
from tqdm import tqdm
def process_function(ranges, 参数1, 参数2):
for i in tqdm(range(ranges.start, ranges.stop)):
XXX
if __name__ == "__main__":
max_workers = os.cpu_count()
all_task_size = 所有任务数量
use_cpu_pre_task = max(all_task_size // max_workers, 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i in range(max_workers):
start_idx = i * use_cpu_pre_task
end_idx = all_task_size if i == max_workers - 1 else (i + 1) * use_cpu_pre_task
end_idx = start_idx if start_idx >= all_task_size else end_idx
range_task = range(start_idx, end_idx)
futures.append(executor.submit(process_function, range_task, 参数1, 参数2))
# 等待所有任务完成
for future in concurrent.futures.as_completed(futures):
try:
print("FINISH feature init.")
future.result() # 获取任务的返回结果
except Exception as e:
print("Task raised an exception:", e)
executor.shutdown()
2.6 终极武器:装饰器
def concurrent_processor(func: Callable) -> Callable:
"""
并发处理装饰器
:param func: 需要并发处理的函数
:return: 包装后的函数
"""
@wraps(func)
def wrapper(*args, **kwargs):
# 获取输入列表
input_list = args[0] if args else kwargs.get('input_list', [])
if not input_list:
return []
all_task_size = len(input_list)
max_workers = min(os.cpu_count(), len(input_list))
use_cpu_pre_task = max(all_task_size // max_workers, 1)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i in range(max_workers):
start_idx = i * use_cpu_pre_task
end_idx = all_task_size if i == max_workers - 1 else (i + 1) * use_cpu_pre_task
end_idx = start_idx if start_idx >= all_task_size else end_idx
if start_idx < all_task_size:
# 根据函数签名决定如何传递参数
if len(inspect.signature(func).parameters) > 1:
futures.append(
executor.submit(
func,
input_list[start_idx:end_idx],
[]
)
)
else:
futures.append(
executor.submit(
func,
input_list[start_idx:end_idx]
)
)
# 等待所有任务完成并收集结果
results = []
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
results.extend(result if isinstance(result, list) else [result])
return results
return wrapper
对于你的要处理的函数(输入需要是list),直接用这个做装饰器即可。
- 举例:
@concurrent_processor
def process_images(img_path_list: List[str]) -> List[Any]:
"""
处理图片列表
:param img_path_list: 输入图片路径列表
:return: 处理后的图片列表
"""
output_list = []
for img_path in img_path_list:
result = cv2.imread(img_path)
if result is not None:
output_list.append(result)
return output_list
附录:进程池
对于 I/O 密集型操作(如文件读取),优先选择ThreadPoolExecutor(线程池),
对于 CPU 密集型任务,优先选择ProcessPoolExecutor(进程池)。
特性/名称 | ThreadPoolExecutor |
ProcessPoolExecutor |
---|---|---|
核心机制 | 多线程(thread) | 多进程(process) |
使用场景 | I/O 密集型任务 | CPU 密集型任务 |
全局解释器锁(GIL) | 受 GIL 限制 | 不受 GIL 限制 |
启动开销 | 较小 | 较大(创建新进程开销更高) |
数据共享 | 内存共享(线程共享内存) | 不共享内存,需通过序列化通信 |
可用内核数 | 单核(受 GIL 限制) | 多核(可充分利用多核 CPU) |
🧠 为什么会这样?
- Python 有 GIL(全局解释器锁),线程无法真正实现并行计算,只能并发处理。适合等待型任务,如文件读写、网络请求。
- 多进程不受 GIL 限制,可以多个核心并行工作,适合做大量 CPU 计算的任务,比如矩阵运算、图像处理、机器学习推理等。
线程池的优势:
- 避免了序列化问题
- 对于 I/O 密集型任务(如图像处理)来说,线程池的性能通常足够好
- 线程间共享内存,避免了进程间通信的开销
- 代码更简单,更容易维护
进程池需要注意的地方:
- 所有函数都定义在模块级别
- 所有参数都是可序列化的
- 避免使用闭包或局部函数
- 确保函数可以被正确导入
- 进程池举例
import numpy as np
import os
from collections import defaultdict
from tqdm import tqdm
import concurrent.futures
feature_folder = 'XXXXX' # 特征文件夹路径
features = defaultdict(list)
features_names = [f for f in os.listdir(feature_folder) if f.endswith('.npy')] # 过滤 .npy 文件
def func(feature_names):
local_features = defaultdict(list)
for feature_name in tqdm(feature_names):
img_name = os.path.splitext(feature_name)[0]
feature_path = os.path.join(feature_folder, feature_name)
feature_vector = np.load(feature_path)
feature_tuple = tuple(feature_vector.flatten())
local_features[feature_tuple].append(img_name)
return local_features
def main():
max_workers = os.cpu_count()
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
# 将文件名列表分成块
chunk_size = max(len(features_names) // max_workers, 1)
futures = []
for i in range(0, len(features_names), chunk_size):
chunk = features_names[i:i + chunk_size]
futures.append(executor.submit(func, chunk))
# 合并所有结果
with tqdm(total=len(futures)) as pbar:
for future in concurrent.futures.as_completed(futures):
local_features = future.result()
for k, v in local_features.items():
features[k].extend(v)
pbar.update(1) # 每完成一个任务更新进度
if __name__ == "__main__":
main()

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)