在这里插入图片描述

ThreadPoolExecutor利用计算机的并发处理能力,适合等待型任务,如文件读写、网络请求。其实也是受C++并行操作的影响,如果需要C++版,可以移步C++thread并行笔记

一、理论漫谈

1.1 什么是线程池

ThreadPoolExecutor顾名思义为线程池,我们知道Python有GIL(全局解释器锁),线程无法真正实现并行计算,只能并发处理。且适用于I/O密集型任务,内存共享,并且可用内核数为单核。

1.2 线程池是用单核

那么问题就来了,下面我们会用os.cpu_count()获取当前计算机的核数,为什么还要获取核数呢?
这是为了用 os.cpu_count() 来计算线程数是为了 “合理地分片任务量” ,而不是真正地并行占满每个 CPU 核心。它本质上是为了最大化线程池的吞吐效率,不是为了 CPU 利用率。

为什么这样做也是合理的?

情况 1: 任务是 I/O 密集型
比如你处理的是下载网页、请求接口、读写文件——这些操作虽然阻塞 Python 线程,但CPU 实际上是空闲的。这时,多线程并发就能提高效率。此时使用 os.cpu_count() 只是一个大致的“并发度建议”,你甚至可以设置更高的 max_workers(比如 2 × 核心数 或更多),只要不炸内存或 IO。

情况 2: 任务是 轻量的 CPU 操作
如果每个任务做的计算很轻,用多线程其实也没啥问题。比如每个函数只做一些小计算和少量 I/O,此时你可以用 os.cpu_count() 来做分片是合理的。

二、快速实践

2.1 明确自身cpu可并行的核数

max_workers = os.cpu_count()

2.2 根据所有任务计算在各个核上平均跑多少任务

use_cpu_pre_task = all_task_size // max_workers

2.3 最后把任务划分在不同的核上跑

def process_function(range_rask, arg1, arg2):
	for i in range(ranges.start, ranges.stop):
		XXX

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    for i in range(max_workers):
        start_idx = i * use_cpu_pre_task
        end_idx = all_task_size if i == max_workers - 1 else (i + 1) * use_cpu_pre_task
        range_task = range(start_idx, end_idx)
        executor.submit(process_function, range_task, arg1, arg2)

2.4 拿来主义

import concurrent.futures
def process_function(ranges, 参数1, 参数2):
	for i in range(ranges.start, ranges.stop):
		XXX

if __name__ == "__main__":
	max_workers = os.cpu_count()
	all_task_size = 所有任务数量
	use_cpu_pre_task = max(all_task_size // max_workers, 1)
	with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
	    for i in range(max_workers):
	        start_idx = i * use_cpu_pre_task
	        end_idx = all_task_size if i == max_workers - 1 else (i + 1) * use_cpu_pre_task
	        end_idx = start_idx if start_idx >= all_task_size else end_idx
	        range_task = range(start_idx, end_idx)
	        executor.submit(process_function, range_task, 参数1, 参数2)

2.5 完善

import concurrent.futures
from tqdm import tqdm

def process_function(ranges, 参数1, 参数2):
	for i in tqdm(range(ranges.start, ranges.stop)):
		XXX

if __name__ == "__main__":
	max_workers = os.cpu_count()
	all_task_size = 所有任务数量
	use_cpu_pre_task = max(all_task_size // max_workers, 1)
	with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
		futures = []
	    for i in range(max_workers):
	        start_idx = i * use_cpu_pre_task
	        end_idx = all_task_size if i == max_workers - 1 else (i + 1) * use_cpu_pre_task
	        end_idx = start_idx if start_idx >= all_task_size else end_idx
	        range_task = range(start_idx, end_idx)
	        futures.append(executor.submit(process_function, range_task, 参数1, 参数2))
	    # 等待所有任务完成
        for future in concurrent.futures.as_completed(futures):
            try:
                print("FINISH feature init.")
                future.result()  # 获取任务的返回结果
            except Exception as e:
                print("Task raised an exception:", e)
        executor.shutdown()

2.6 终极武器:装饰器

def concurrent_processor(func: Callable) -> Callable:
    """
    并发处理装饰器
    :param func: 需要并发处理的函数
    :return: 包装后的函数
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 获取输入列表
        input_list = args[0] if args else kwargs.get('input_list', [])
        
        if not input_list:
            return []
            
        all_task_size = len(input_list)
        max_workers = min(os.cpu_count(), len(input_list))
        use_cpu_pre_task = max(all_task_size // max_workers, 1)
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for i in range(max_workers):
                start_idx = i * use_cpu_pre_task
                end_idx = all_task_size if i == max_workers - 1 else (i + 1) * use_cpu_pre_task
                end_idx = start_idx if start_idx >= all_task_size else end_idx
                
                if start_idx < all_task_size:
                    # 根据函数签名决定如何传递参数
                    if len(inspect.signature(func).parameters) > 1:
                        futures.append(
                            executor.submit(
                                func,
                                input_list[start_idx:end_idx],
                                []
                            )
                        )
                    else:
                        futures.append(
                            executor.submit(
                                func,
                                input_list[start_idx:end_idx]
                            )
                        )
            
            # 等待所有任务完成并收集结果
            results = []
            for future in concurrent.futures.as_completed(futures):
                result = future.result()
                if result:
                    results.extend(result if isinstance(result, list) else [result])
            
        return results
    return wrapper

对于你的要处理的函数(输入需要是list),直接用这个做装饰器即可。

  • 举例:
@concurrent_processor
def process_images(img_path_list: List[str]) -> List[Any]:
    """
    处理图片列表
    :param img_path_list: 输入图片路径列表
    :return: 处理后的图片列表
    """
    output_list = []
    for img_path in img_path_list:
        result = cv2.imread(img_path)
        if result is not None:
            output_list.append(result)
    return output_list

附录:进程池

对于 I/O 密集型操作(如文件读取),优先选择ThreadPoolExecutor(线程池),
对于 CPU 密集型任务,优先选择ProcessPoolExecutor(进程池)。

特性/名称 ThreadPoolExecutor ProcessPoolExecutor
核心机制 多线程(thread) 多进程(process)
使用场景 I/O 密集型任务 CPU 密集型任务
全局解释器锁(GIL) 受 GIL 限制 不受 GIL 限制
启动开销 较小 较大(创建新进程开销更高)
数据共享 内存共享(线程共享内存) 不共享内存,需通过序列化通信
可用内核数 单核(受 GIL 限制) 多核(可充分利用多核 CPU)

🧠 为什么会这样?

  • Python 有 GIL(全局解释器锁),线程无法真正实现并行计算,只能并发处理。适合等待型任务,如文件读写、网络请求。
  • 多进程不受 GIL 限制,可以多个核心并行工作,适合做大量 CPU 计算的任务,比如矩阵运算、图像处理、机器学习推理等。

线程池的优势:

  1. 避免了序列化问题
  2. 对于 I/O 密集型任务(如图像处理)来说,线程池的性能通常足够好
  3. 线程间共享内存,避免了进程间通信的开销
  4. 代码更简单,更容易维护

进程池需要注意的地方:

  1. 所有函数都定义在模块级别
  2. 所有参数都是可序列化的
  3. 避免使用闭包或局部函数
  4. 确保函数可以被正确导入
  • 进程池举例
import numpy as np
import os
from collections import defaultdict
from tqdm import tqdm
import concurrent.futures

feature_folder = 'XXXXX'  # 特征文件夹路径
features = defaultdict(list)
features_names = [f for f in os.listdir(feature_folder) if f.endswith('.npy')]  # 过滤 .npy 文件


def func(feature_names):
    local_features = defaultdict(list)
    for feature_name in tqdm(feature_names):
        img_name = os.path.splitext(feature_name)[0]
        feature_path = os.path.join(feature_folder, feature_name)
        feature_vector = np.load(feature_path)
        feature_tuple = tuple(feature_vector.flatten())
        local_features[feature_tuple].append(img_name)

    return local_features


def main():
    max_workers = os.cpu_count()
    
    with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
        # 将文件名列表分成块
        chunk_size = max(len(features_names) // max_workers, 1)
        futures = []
        
        for i in range(0, len(features_names), chunk_size):
            chunk = features_names[i:i + chunk_size]
            futures.append(executor.submit(func, chunk))

        # 合并所有结果
        with tqdm(total=len(futures)) as pbar:
            for future in concurrent.futures.as_completed(futures):
                local_features = future.result()
                for k, v in local_features.items():
                    features[k].extend(v)
                pbar.update(1)  # 每完成一个任务更新进度


if __name__ == "__main__":
    main()

参考链接:concurrent.futures — 启动并行任务

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐