45.Python多进程
文章目录1.模块1.2.Process类1.3参数:1.4属性介绍1.5绑定方法2.开启子进程的两种方式2.1方式12.2方式23.进程数据隔离4.方法及属性4.1 .join()方法4.2 进程状态4.3进程名称与PID4.4守护进程5.进程同步与互斥锁5.1 抢票模拟6.队列6.1队列方法6.2生产消费模型7.信号量8.Event时间1.模块Python中的多线程无法利用多核优势,可以通过去他
·
1. 开启子进程
1.1 模块介绍
Python中的多线程无法利用多核优势,可以通过去他的模块去实现(不常用).
Python中大部分情况下需要使用多进程, 内置multiprocessing模块, 开启子进程.
1.2.Process类
Process时multiprocessing模块中创建进程的类, 由该类实例化得到对象(一个子进程的).
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):
1.3 参数
target: 表示调用对象, 子进程要执行的函数.
args: 表示调用对象的位置参数(元组).
kwargs: 表示调用对象的关键字参数(字典).
name: 表示子进程的名称.
1.4 属性介绍
.name 进程的名称
.pid 进程的pid号
.daemon 默认值为False, 设置为True, 设置子进程为守护进程,
父进程结束, 子进程也结束, 需要在.start前设置.
.exitcode 进程运行时间为None,如果为-N, 表示被信号N结束.
.authkey 进程的身份验证, 默认由os.usandom()随机生成32个字符组成的字符串.
1.5 绑定方法
.start() 启动子进程, 并调用子进程的.run方法.
.run() 进程启动时运行的方法, 正是它去调用target指定的函数, 自定义的类中一定要实现改方法.
.terminate() 强制终止进程, 不会进行任何清理.
.isalive() 查看子进程是否存活, 返回值为布尔值.
.join() 主进程程等待子进程的结束.
2. 开启子进程的两种方式
在windows系统中Process()必须放在if__name__ = '__main__':下.
在windows 下创建进程类似于模块导入的方式, 会从上往下运行代码, 不放在main内会出现循环导入问题.
2.1 方式1
* 1. 定义需要执行的函数.
* 2. Process类生成对象时, 将需要调用的函数的名称. Process调用父类的初始化方法... self._target = target
* 3. 子进程对象.start()启子进程, 程序在这里一分为二, 每个进程有自己的代码要执行.
* 4. .start()方法会调用.run()方法, .run()方法会调用self._target加括号调用函数执行.
from multiprocessing import Process
# 子进程执行的函数
def func():
print('我是一个子进程')
if __name__ == '__main__':
# 设置子进程执行的函数, 实例化一个子进程的对象.
p = Process(target=func)
# 启动子进程
p.start() # 启动子进程, 程序在这里一分为二
print('我是主进程')
终端:
我是主进程 # 先展示
我是一个子进程 # 后展示
____________________________________________
为什么先显示 我是主进程 再显示 我是一个子进程 ?
开启子进程要一些时间, 主进程也会继续往下执行.
如果启动子进程的速度较快的话, 顺序可能就不一样了.
2.2 方式2
* 1. 自定义生成进程类, 继承Process类, 调用父类的初始化方法, 生成子进程对象.
* 2. 编写run方法(必须), 将子进程需要执行的代码写在run方法中.
* 3. 使用自定义类生成子进程对象.
* 4. 子进程对象.start() 启动子进程,
from multiprocessing import Process
# 继承Process类, 子定义生成子进程的类
class func(Process):
# 调用Process类的方法生成子进程对象
def __init__(self):
super().__init__()
# 定义run方法(在run方法中写需要子进程执行的代码)
def run(self):
print('我是一个子进程')
if __name__ == '__main__':
# 使用自定义的类生成子进程对象
p = func()
# 启动子进程
p.start()
print('我是主进程')
2.3 注意事项
from multiprocessing import Process
# 定义子进程调用的函数
def func1():
print('我是子进程!')
# 主程序
if __name__ == '__main__':
# 生成子进程
p1 = Process(target=func1)
p1.start()
print(12)
# 写在外面 每开启一个进程就执行一次, 一定要写在if __name__ == '__main__':中
# 每开启一个进程就导入一下一个文件的内容... 所以就执行一次
终端显示:
12
12
我是子进程
3. 进程之间数据隔离
进程与进程之间的数据是隔离的, 某个进程修改数据, 改动的仅仅自己进程内的数据, 不会影响其他的进程
from multiprocessing import Process
# 导入时间模块
import time
# 定义一个全局变量
x = 10
# 定义子进程调用的函数
def task():
# 在外层名称空间中定义一个变量x, x为全局变量
global x
x = 0 # 修改值全局变量x为0.
if __name__ == '__main__':
# 生成子进程对象
p = Process(target=task)
# 启动子进程
p.start()
# 延时3秒, 延时的目的是, 为了等待子进程先执行
time.sleep(3)
# 打印全局变量x的值
print(x) # 10
终端打印的值是10, 而不是0.
运行子进程会单独使用一个内存空间, 与原来的进程使用的内存空间是隔离的.
4. 方法及属性
4.1 .join()方法
.join() 设置主进程等待子进程结束, 先执行子进程, 后执行主进程.
from multiprocessing import Process
import time
def task():
print('我是子进程!')
if __name__ == '__main__':
p = Process(target=task)
p.start()
# time.sleep(0.2) # 自己电脑下差距 0.1秒 - 0.2 之间的,
print('我只主进程!')
终端显示:
我只主进程!
我是子进程!
from multiprocessing import Process
# 定义子进程调用的函数
def func():
print('我是子进程!')
# 主程序
if __name__ == '__main__':
# 生成子进程
p = Process(target=func)
# 执行子进程
p.start()
# 主进程阻塞, 等待子进程执行完毕在执行
p.join()
print('我只主进程!')
终端显示:
我是子进程!
我只主进程!
from multiprocessing import Process
# 定义子进程调用的函数
def func1():
print('我是子进程!')
# 主程序
if __name__ == '__main__':
# 生成5个子进程
p1 = Process(target=func1)
p1.start()
print(12) # 写在外面 每开启一个进程就执行一次, 一定要写在if __name__ == '__main__':中
4.2 多个进程使用join
多进程中得带子进程结束在执行主进程.
from multiprocessing import Process
# 定义子进程调用的函数
def func(i):
print(f'我是子进程{i}')
# 主程序
if __name__ == '__main__':
# 定义一个列表存放子进程对象
p_list = []
# 生成5个子进程
for i in range(5):
p = Process(target=func, args=(i,))
p.start()
# 将子进程添加到列表中
p_list.append(p)
# 开启五个线程后, 为每个五个子进程执行.join()方法
for p in p_list:
p.join()
print('主进程')
for i, p in enumerate(p_list):
# 添加这局查看情况
print(f'执行子进程{i}.join方法')
p.join()
print('主进程')
4.3 进程状态
.terminate() 强制终止进程, 不会进行任何清理.
.isalive() 查看子进程是否存活, 返回值为布尔值.
from multiprocessing import Process
import time
def task():
print('我是子进程!')
time.sleep(3)
if __name__ == '__main__':
p = Process(target=task)
p.start()
# 发送开启进程的信号后, 就马上发送杀死进程的信号.
p.terminate()
print('我只主进程!')
# 系统需要一点时间处理那两个信号, 此时可能True.
print(p.is_alive())
time.sleep(0.1)
print(p.is_alive()) # False
终端显示:
我只主进程!
True
False
__________________________________________________________
原本主进程就比子进程的运行速度快, 没等子进程完全启动就杀死子进程.
4.4 进程名称, 进程标识符
实例化对象时, 设置参数name可用修改进程的名称.
.name 查看进程的名称.
pid 指进程识别号, 也称进程标识符.
.pid 查看进程的pid号.
* 进程对象的方法, .name 与 .pid 查看的是子进程的信息.
from multiprocessing import Process
def task():
print('我是子进程!')
if __name__ == '__main__':
# 设置进程的名称
p = Process(target=task, name='kid')
p.start()
# 子进程的名称
print(p.name)
# 子进程的pid号
print(p.pid)
终端显示:
kid
1366
我是子进程!
4.5 os模板查看进程号
os.get 查看当前进程pid号.
os.getppid 查看当前进程的父进程pid号.
from multiprocessing import Process
import os
def task():
print('子进程中显示 当前进程pid号:%s' % os.getpid())
print('子进程中显示 父进程pid号:%s' % os.getppid())
if __name__ == '__main__':
p = Process(target=task, name='kid')
p.start()
print('主进程中显示 父进程pid号:%s' % os.getppid())
print('主进程中显示 子进程pid号:%s' % p.pid)
print('主进程中显示 当前进程pid号:%s' % os.getpid())
终端显示:
主进程中显示 父进程pid号:13848
主进程中显示 子进程pid号:14476
主进程中显示 当前进程pid号:13664
子进程中显示 当前进程pid号:14476
子进程中显示 父进程pid号:13664
4.6 守护进程
守护进程: 当主进程结束了子进程立刻结束.
生成子进程将使用.deamon=True 方法 将子进程进程对象, 设置为守护进程.
注意点:
* 1. 子进程中无法在创建守护进程
* 2. 在开启进程前设置.
from multiprocessing import Process
def task():
print('我是子进程!')
if __name__ == '__main__':
p = Process(target=task)
# 设置守护进程
p.daemon = True
p.start()
print('我是主进程')
终端显示:
我是主进程
from multiprocessing import Process
import time
# 子进程
def task():
# 打印123
print(123)
# 延时1秒
time.sleep(1)
# 打印end123
print("end123")
if __name__ == '__main__':
# 生成子进程对象
p1 = Process(target=task)
# 开启进程守护
p1.daemon = True
# 开启子进行
p1.start()
# 延时0.5秒(让子进行执行)
time.sleep(0.5)
# 打印main-------
print("main-------")
终端显示:
123
main-------
____________________________________________________________________
先让子进程执行一会, 主进程结束后, 没有时间让子进行执行完, 就杀死子进程了.
5. 进程同步与互斥锁
5.1 共享系统资源
进程之间数据是不共享, 但是共享同一个操作系统,
所以访问同一个文件, 或同一个打印终端都是可以的执行.
不过在执行顺序可能会出现错乱.
共享终端案例: 每一次运行结果都不相同, 那个进程跑得快谁先运行.
from multiprocessing import Process
# 定义子进程执调用的函数
def work(i):
print('%s is running' % i)
print('%s is done' % i)
if __name__ == '__main__':
# 开启三个子进程
for i in range(3):
# 传递参数 i
p = Process(target=work, args=(i,))
# 开启子进程
p.start()
终端显示:
0 is running
0 is done
2 is running
2 is done
1 is running
1 is done
5.2 进程的异步性
在多道批处理系统中, 多个进程是可以并发执行的, 但由于系统的资源有限, 进程的执行不是一贯到底的,
而是走走停停, 以不可预知的速度向前推进, 这就是进程的异步性.
进程的异步性会带来什么问题呢?
举个例子, 如果有 A、B 两个进程分别负责读和写数据的操作,
这两个线程是相互合作、相互依赖的.那么写数据应该发生在读数据之前.
而实际上, 由于异步性的存在, 可能会发生先读后写的情况,
而此时由于缓冲区还没有被写入数据, 读进程 A 读取空数据.
* 先在目录下新建a.txt, 否则读进程先运行的话, 没有a.txt文件可能会报错.
from multiprocessing import Process
# 往文件中写入数据
def write():
with open('a.txt', mode='wt', encoding='utf8') as wf:
print('写进程, 往文件中写入数据!')
wf.write('hello world!')
# 读取文件数据
def read():
with open('a.txt', mode='r+t', encoding='utf8') as f:
print('读进程, 往文件中读取数据!')
res = f.read()
"""
truncate()方法用于截断文件, 如果指定了可选参数 size,
则表示截断文件为 size 个字符, 截断之后 size 后面的所有字符被删除.
"""
# 清空文本
f.truncate(0)
print(f'读进程, {res}')
if __name__ == '__main__':
# 函数列表
func_list = [write, read]
# 开启子进程
for func in func_list:
# 生成子进程对象
p = Process(target=func)
# 执行子进行
p.start()
5.4 进程同步
从上面的例子我们能看出, 一个进程的执行可能影响到另一个进程的执行.
进程同步(synchronization)就是指协调这些完成某个共同任务的并发线程,
在某些位置上指定线程的先后执行次序、传递信号或消息.
再举个生活中的进程同步的例子, 你想要喝热水, 于是你打了一壶水开始烧,
在这壶水烧开之前, 你只能一直等着, 水烧开之后水壶自然会发生响声提醒你来喝水, 于是你就可以喝水了.
就是说水烧开这个事情必须发生在你喝水之前.
注意不要把进程同步和进程调度搞混了:
进程调度是为了最大程度的利用 CPU 资源, 选用合适的算法调度就绪队列中的进程.
进程同步是为了协调一些进程以完成某个任务,
比如读和写, 你肯定先写后读, 不能先读后写吧, 这就是进程同步做的事情了,
指定这些进程的先后执行次序使得某个任务能够顺利完成.
5.5 进程互斥
因为进程的并发性, 并发执行的线程不可避免地需要共享一些系统资源.
进程互斥(mutual exclusion)就是用来解决这个问题的.
当某个进程 A 启用摄像头, 如果另一个进程 B 也想启用摄像, 它就必须等待, 直到 A 进程使用结束并释放测试资源后,
B 进程才能去访问.(微信视屏通过, 支付宝无法扫码...)
在一个时间段内只允许一个进程使用的资源, 这也就是互斥的意思, 也将其称为临界资源,
对临界资源进行访问的那段代码称为临界区.
5.6 互斥锁
Lock进程互斥锁:解决为进程抢占资源, 出现错乱的结果,
可以给进程上锁,在一个进程执行时给他上锁, 执行结束后释放锁, 给下一个进程使用,
同一个时刻只有一个进程在执行, 解决资源抢占问题但是降低了效率.
开启了多个进程, 那个进行先抢到锁就新执行, 其他进程代码锁的释放, 再次抢锁...
导入 Lock 模块:
from multiprocessing import Lock
使用:
* 1. lock = Lock() 实例化一个对象.
* 2. 在进程中调用函数中使用lock的方法:
lock.acquire() 上锁
lock.release() 释放锁
from multiprocessing import Process, Lock
import time
def task(i, lock):
# 上锁
lock.acquire()
print('%s is running' % i)
time.sleep(2)
print('%s is done' % i)
# 释放锁
lock.release()
if __name__ == '__main__':
# 生成lock互斥锁对象
lock = Lock()
for i in range(3):
# 将lock传递到函数中
p = Process(target=task, args=(i, lock))
p.start()
终端显示:
1 is running
1 is done
0 is running
0 is done
2 is running
2 is done
5.7 抢票模拟(不上锁)
open('db.txt', 'w') 返回一个文件句柄(文件对象), 没有绑定给变量,
相当于一次性使用后就被删除了, 不需要自己关闭文件句柄.
# 当前目录下创建一个文件db.txt json文件格式, 设置一张车票.
{"count": 1}
from multiprocessing import Process
import time, json
# 查票
def search(i):
ticket = json.load(open('db.txt'))
print('%s查看车票剩余:%s张!' % (i, ticket['count']))
return ticket
# 抢票
def rob(ticket, i):
if ticket['count'] > 0:
ticket['count'] -= 1
time.sleep(0.2) # 网络延时
json.dump(ticket, open('db.txt', 'w'))
print('%s抢票成功' % i)
else:
print('%s没有抢到票' % i)
# 合并到一个函数中
def func(i):
ticket = search(i)
rob(ticket, i)
if __name__ == '__main__':
# 生成5个子进程模拟抢票
for i in range(5):
p = Process(target=func, args=(i,))
p.start()
# 终端显示:
0查看车票剩余:1张!
1查看车票剩余:1张!
2查看车票剩余:1张!
3查看车票剩余:1张!
4查看车票剩余:1张!
1抢票成功
0抢票成功
2抢票成功
4抢票成功
3抢票成功
____________________________________________________________________
第一个进程抢到票后, 还没有来得及修改文件, 其他的进程都查询剩余的票数为1.
5.8 抢票模拟(上锁)
from multiprocessing import Process, Lock
import time, json
# 查票
def search(i):
ticket = json.load(open('db.txt'))
print('%s查看车票剩余:%s张!' % (i, ticket['count']))
return ticket
# 抢票
def rob(i, ticket):
if ticket['count'] > 0:
ticket['count'] -= 1
time.sleep(0.2) # 模拟网络延时
json.dump(ticket, open('db.txt', 'w'))
print('%s抢票成功' % i)
else:
print('%s没有抢到票' % i)
def func(i, lock):
# 上锁
lock.acquire()
ticket = search(i)
rob(i, ticket)
# 释放锁
lock.release()
if __name__ == '__main__':
# 生成互斥锁对象
lock = Lock()
# 生成5个子进程模拟抢票
for i in range(5):
p = Process(target=func, args=(i, lock))
p.start()
终端显示:
0查看车票剩余:1张!
0抢票成功
1查看车票剩余:0张!
1没有抢到票
3查看车票剩余:0张!
3没有抢到票
4查看车票剩余:0张!
4没有抢到票
2查看车票剩余:0张!
2没有抢到票
______________________________________________________________________
加锁保证多个进程, 同时只能有一个进程使用临界资源, 多个进程变成串行运行.
6. 进程间通行
6.1 进程间互相通信方法
进程之间数据互相隔离的, 要实现进程之间通信可以使用multiprocessing模块提供两种方法.
* 1. 队列
* 2. 管道
队列和管道的数据都是将数据存放于内存中.
6.2 队列的使用
队列是基于管道加锁实现的, 使用队列来解决数据共享问题.
队列的特性:先进先出.
创建共享的队列:
队列对象 = Queue([maxsize])
Queue能安全的实现多进程之间的数据传递.
maxsize 是队列运行的最大项数, 省略则为无限制.
.put()方法 将数据插入到队列中.
可选参数:
blocked: 默认为True, 为False 队列满了立刻抛出异常, Queue.Full.
timeout: 设置一个阻塞时间, 在这个时间内, 队列没有有剩余空间, 抛出异常, Queue.Full.
timeout=3, 3秒内未有剩余空间, 抛出异常.
.get()方法 从队列中读取一个元素(取出后队列中删除).
可选参数:
blocked: 默认为True, 为False 队列空了立刻抛出异常, Queue.Full. 如果是队列中最后一个值则放回该值.
timeout: 设置一个阻塞时间, 在这个时间内未收到值, 抛出异常, Queue.Full.
timeout=3, 3秒内未有传入值就抛出异常.
.put_nowait()方法 等同于 .put(False)
.get_nowait()方法 等同于 .get(False)
.empty() 队列为空返回True.
.full() 队列满了返回True.
.qsize() 队列现在的项目数量.
上诉三个方法不可靠,
在返回结果的同时, 程序还在运行, 刚返回结果, 有别的程序立刻处理了结果, 返回的值和现在的值又不一样了.
from multiprocessing import Process, Queue
# 获取队列中的数据
def task(q):
print(q.get())
print(q.get())
if __name__ == '__main__':
# 生成队列
q = Queue(3)
# 主进程往队列中写入数据
q.put('a')
q.put('b')
q.put('c')
# 队列满了返回True
print(q.full())
# 队列现在的项目数量
print(q.qsize())
# 创建子进程对象
p = Process(target=task, args=(q,))
# 启动子进程
p.start()
终端显示:
True
3
a
b
6.3 生产消费模型
生产数据的--> 比喻为生产者, 使用数据的--> 比喻为消费者.
生产者处理速度快, 消费者处理速度慢, 则生成者必须等待消费者处理, 有位置存放, 才能继续数据生产.
消费者处理速度快, 生产者处理速度慢, 则消费者必须等待生产者生产, 有了数据后, 才能继续进行处理.
生产者与消费者通过一个容器来解决生产者和消费者的强耦合问题,
生产者和消费者之间不直接通信, 而是通过阻塞队列进行通信,
生产者将数据存进阻塞队列中, 消费者从阻塞队列中取数据.
from multiprocessing import Process, Queue
import time
# 生产者
def production(q, name, food):
for i in range(10):
date = ('%s生产了第%s个%s.' % (name, i, food))
date1 = ('%s生产的第%s个%s.' % (name, i, food))
# 将生产的包子存到队列中
q.put(date1)
print(date)
# 消费者
def consumption(q, name):
for i in range(10):
time.sleep(1)
# 从队列中取出包子吃
print('%s 吃了 %s' % (name, q.get()))
if __name__ == '__main__':
# 创建队列
q = Queue(2)
# 创建生产者子进程
p1 = Process(target=production, args=(q, 'kid', '包子'))
# 开始生产
p1.start()
# 创建消费者子进程
p2 = Process(target=consumption, args=(q, 'qz'))
# 开始消费
p2.start()
7. 信号量
互斥锁(线程锁)同时只允许一个线程更改数据, (一把锁).
信号量(Semahpore)同时允许一定数量的线程更改数据, (多把锁).
信号量: 是一个变量, 控制着对公共资源或者临界区的访问.
信号量维护着一个计数器, 指定可同时访问资源或者进入临界区的线程数.
每次有一个线程获得信号量时, 计数器-1.
若计数器为0, 其他线程就停止访问信号量, 直到另一个线程释放信号量.
from multiprocessing import Process, Semaphore
import time
# 子进程调用的函数
def task(lock, i):
# 加信号量锁
lock.acquire()
print('%s号程序开始执行!' % i)
time.sleep(2)
print('%s号程序执行完毕!' % i)
time.sleep(2)
# 释放信号量锁
lock.release()
if __name__ == '__main__':
# 创建信号量对象, 看成是两把锁
lock = Semaphore(2)
# 创建10个子进程
for i in range(10):
p = Process(target=task, args=(lock, i))
p.start()
终端显示:
1号程序开始执行!
9号程序开始执行!
1号程序执行完毕!
9号程序执行完毕!
4号程序开始执行!
0号程序开始执行!
4号程序执行完毕!
0号程序执行完毕!
8号程序开始执行!
7号程序开始执行!
8号程序执行完毕!
7号程序执行完毕!
3号程序开始执行!
6号程序开始执行!
3号程序执行完毕!
6号程序执行完毕!
5号程序开始执行!
2号程序开始执行!
5号程序执行完毕!
2号程序执行完毕!
_____________________________________________________________________
开启十个子进程, 每个两个进程抢到锁开启执行程序, 释放信号量后, 剩下的进程抢锁
8. Event事件
8.1 介绍
Event事件: 事件处理的机制, 通过标志为, 控制全部进程进入阻塞状态,
也可以通过控制标志位,解除全部进程的阻塞.
注意:定义的事件对象, 默认状态是阻塞.
实现原理:
全局定义了一个内置标志Flag,
如果Flag值为 False, 那么当程序执行 event.wait方法时就会阻塞,
如果Flag值为True, 那么event.wait 方法时便不再阻塞.
8.2 常用方法
set(): 将标志设为True, 并通知所有处于等待阻塞状态的线程恢复运行状态.
clear(): 将标志设为False.
wait(timeout): 如果标志为True将立即返回, 否则阻塞线程至等待阻塞状态, 等待其他线程调用set().
isSet(): 获取内置标志状态, 返回True或False.
8.3 实例
from multiprocessing import Process, Event
import time
# 发红包
def teacher(event):
print('老师准备发红包了!')
time.sleep(3)
print('红包发出去了!')
# 发送信号
event.set()
# 抢红包
def student(event):
print('同学们掏出了手机准备抢红包!')
# 等待信号
event.wait()
print('同学抢到了')
if __name__ == '__main__':
# 创建事件对象
event = Event()
# 创建抢红包子进程
p1 = Process(target=student, args=(event,))
# 开启抢红包
p1.start()
# 创建发红包子进程
p2 = Process(target=teacher, args=(event,))
# 发红包
p2.start()
终端显示:
同学们掏出了手机准备抢红包!
老师准备发红包了!
红包发出去了!
同学抢到了

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)