因上努力

个人主页:丷从心·

系列专栏:Python基础

学习指南:Python学习指南

果上随缘


线程与进程的区别与联系

  • 线程是可以被计算机操作系统调度的最小单元
  • 进程是计算机分配资源(CPU、内存等)的最小单元
  • 一个进程中至少有一个线程,同一个进程中的线程共享进程中的资源

同步任务

  • 我们在此之前编写的代码都是同步代码,代码从上到下按顺序执行,如果前一个任务没有完成,那么不能运行之后的任务
示例
import time


def work_1():
    print('任务1...')

    time.sleep(2)


def work_2():
    print('任务2...')

    time.sleep(2)


start = time.time()

work_1()
work_2()

end = time.time()

print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 4.020323038101196 s
  • 可以看到整个程序用时 4 4 4秒,work_2()需要等待work_1()运行结束后才能运行

并行任务

  • 使用线程来运行上面的代码,能够优化运行时间
示例
import time
import threading


def work_1():
    print('任务1...')

    time.sleep(2)


def work_2():
    print('任务2...')

    time.sleep(2)


# 通过 Thread 类创建线程对象, 并使用 target 绑定线程对象要运行的任务
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)

start = time.time()

# 运行线程
t1.start()
t2.start()

t1.join()
t2.join()

end = time.time()

print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 2.0046136379241943 s
  • 可以看到整个程序用时 2 2 2秒,work_1()work_2并行运行
线程调度的“随机性”
  • 下面的示例可以看到操作系统调度线程时的“随机性”
import time
import threading


def work_1():
    for i in range(5):
        print('任务1...')

        time.sleep(2)


def work_2():
    for i in range(5):
        print('任务2...')

        time.sleep(2)


t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)

t1.start()
t2.start()
任务1...
任务2...
任务2...
任务1...
任务1...
任务2...
任务2...
任务1...
任务2...
任务1...
  • 可以看到任务 1 1 1和任务 2 2 2的调度顺序是我们无法确定的,是由操作系统的调度算法决定的

线程属性和方法

  • 在学习线程方法之前,我们需要知道Python程序是如何被运行的
    • 一个Python文件被解释器运行时会在操作系统中创建一个进程
    • 然后该进程会创建一个线程来运行文件中的代码,这个程序最初创建的线程称为主线程
    • 当主线程运行到t = threading.Thread()时会创建一个新的线程,称为子线程
    • 当前进程中的主线程与子线程由操作系统进行调度,并发地运行,具体如何调度线程由操作系统的调度算法决定
    • 子线程在运行时,主线程不会等待子线程,而是继续向下执行,直到执行到文件末尾没有代码时,主线程会等待子线程运行结束后再退出
thread_object.start()方法
  • t = threading.Thread()只是创建了一个线程,并不会运行线程代码

  • t.start()使线程t达到就绪状态,等待操作系统进行调度,具体何时调度由操作系统决定

  • 以上面的并行任务的代码为例,先注释掉t1.join()t2.join()

import time
import threading


def work_1():
    print('任务1...')

    time.sleep(2)


def work_2():
    print('任务2...')

    time.sleep(2)


# 通过 Thread 类创建线程对象, 并使用 target 绑定线程对象要运行的任务
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)

# 运行线程
start = time.time()

t1.start()
t2.start()

# t1.join()
# t2.join()

end = time.time()

print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 0.002573728561401367 s
  • 可以看到主线程没有等待子线程,而是继续向下执行
  • 当执行到end = time.time()时,此时end记录的时间是主线程运行到这行代码的时间
  • 之后运行print(f'总共用时: {end - start} s'),输出时间0.002573728561401367 s,此时执行到了文件末尾没有其他代码,主线程会等待子线程运行结束后再退出
  • 为了能正确记录线程运行的时间,我们需要让主线程等待子线程
thread_object.join()方法
  • t.join()使主线程等待子线程,子线程任务执行结束后主线程再继续向下执行
  • 仍然以上面的并行任务的代码为例,取消注释t1.join()t2.join()
import time
import threading


def work_1():
    print('任务1...')

    time.sleep(2)


def work_2():
    print('任务2...')

    time.sleep(2)


# 通过 Thread 类创建线程对象, 并使用 target 绑定线程对象要运行的任务
t1 = threading.Thread(target=work_1)
t2 = threading.Thread(target=work_2)

# 运行线程
start = time.time()

t1.start()
t2.start()

t1.join()
t2.join()

end = time.time()

print(f'总共用时: {end - start} s')
任务1...
任务2...
总共用时: 2.011659622192383 s
  • 可以看到主线程等待子线程运行结束后才继续向下执行,正确记录了子线程运行的时间
thread_object.daemon属性
  • 设置守护线程,需要在线程启动之前进行设置
  • 如果一个线程是守护线程,那么主线程运行到文件末尾后不论子线程任务是否结束都会自动退出
没有设置守护线程的情况
import time
import threading


def work():
    for i in range(5):
        print(i)

        time.sleep(1)


t = threading.Thread(target=work)
# t.daemon = True

t.start()

print('主线程即将退出...')
0
主线程即将退出...
1
2
3
4
设置守护线程的情况
import time
import threading


def work():
    for i in range(5):
        print(i)

        time.sleep(1)


t = threading.Thread(target=work)
t.daemon = True

t.start()

print('主线程即将退出...')
0
主线程即将退出...
  • 可以看到并没有继续输出 1 1 1 2 2 2 3 3 3 4 4 4,主线程就退出了
thread_object.current_thread()方法
  • thread_object.current_thread()方法用于获取当前线程对象的引用
  • 可以用来获取线程名称和线程号
import threading


def work():
    tid = threading.current_thread().ident
    name = threading.current_thread().name

    print(f'tid: {tid},\t name: {name}')


for i in range(5):
    t = threading.Thread(target=work)
    t.name = f'线程-{i}'

    t.start()
    t.join()
tid: 25380,	 name: 线程-0
tid: 18372,	 name: 线程-1
tid: 24616,	 name: 线程-2
tid: 3700,	 name: 线程-3
tid: 8836,	 name: 线程-4

GIL

  • Python中存在全局解释器锁,即GIL

  • GILCPython解释器独有的,主要的功能是让一个进程在同一时刻只有一个线程被运行

  • 例如在一个进程中创建了多个线程,在运行当前程序时,同一时刻只能有一个线程被运行,其他线程等待操作系统调度,这种情况下无法利用多核CPU的优势

  • 如果想要绕开GIL,那么可以使用多进程的方式,创建多个进程,使每个进程只有一个主线程,但是多进程消耗的资源比多线程的方式多

  • 所以如果任务是I/O密集型任务,优先使用多线程方式,如果任务是计算密集型任务,优先使用多进程方式


线程安全

  • 如果多个线程对同一个全局变量进行更新操作,会产生资源竞争问题,会导致数据不一致,计算结果出错
不可控的调度
import threading


def add_():
    global counter

    for i in range(1000000):
        counter += 1


def sub_():
    global counter

    for i in range(1000000):
        counter -= 1


counter = 0

t1 = threading.Thread(target=add_)
t2 = threading.Thread(target=sub_)

t1.start()
t2.start()

t1.join()
t2.join()

print(counter)
-742215
  • 可以看到线程t1num进行 1000000 1000000 1000000次加 1 1 1操作,线程t2num进行 1000000 1000000 1000000次减 1 1 1操作,预期的结果输出 0 0 0,而实际结果输出一个奇怪的数 − 742215 -742215 742215,多次重复运行会发现结果不尽相同
  • 这是由于加法和减法操作并不是一个“原子操作”,即在系统底层是使用多条汇编代码实现的,例如加 1 1 1操作的汇编代码如下
mov	0x8049a1c, %eax
add $0x1, %eax
mov %eax, 0x8049a1c
  • 这个例子假定变量counter指向内存地址0x8049a1c,先用mov指令,从内存地址处取出值,放入eax寄存器,然后给eax寄存器的值加 1 1 10x1),最后eax的值被存回内存中相同的地址
  • 如果操作系统调度线程t1运行到这段代码,它将counter的值(假设它这时是 0 0 0)加载到它的寄存器eax中,因此线程 1 1 1eax = 0,然后它向寄存器加 1 1 1,因此eax = 1,然后此时发生了时钟中断,操作系统将当前正在运行的线程(它的程序计数器、寄存器,包括eax等)的状态保存到线程t1TCB(线程控制块)中
  • 然后糟糕的事情发生了,线程t2被调度运行,并进入下面这段代码
mov	0x8049a1c, %eax
sub $0x1, %eax
mov %eax, 0x8049a1c
  • 线程t2执行了第一条指令,获取内存中counter的值并将其放入其eax中(每个线程都有自己的专用寄存器),此时counter的值仍为 0 0 0,因此线程t2eax = 0,假设线程t2执行接下来的两条指令,将eax 1 1 1(因此eax = -1),然后将eax的内容保存到 counter(地址0x8049a1c)中,全局变量counter现在的值是 − 1 -1 1

  • 然后,又发生一次上下文切换,线程t1恢复运行,它已经执行过movadd指令,现在准备执行最后一条mov指令,线程t1eax = 1,最后的mov指令执行,将值保存到内存,counter被设置为 1 1 1

  • 简单来说,就是执行了一次加 1 1 1操作,一次减 1 1 1操作,而counter被加了 1 1 1,而不是预期的counter = 0

  • 系统对线程的调度是无法在代码层面上进行控制的

  • 使用锁可以解决上述问题,锁的作用就是对加锁的代码执行原子操作,即要么代码全部运行,要么代码全不运行
示例
from threading import Thread, RLock

lock_obj = RLock()


def add_():
    global counter

    for i in range(1000000):
        lock_obj.acquire()  # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁

        counter += 1

        lock_obj.release()  # 释放锁, 当锁被释放后其他等待的线程才能被正常运行


def sub_():
    global counter

    for i in range(1000000):
        lock_obj.acquire()  # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁

        counter -= 1

        lock_obj.release()  # 释放锁, 当锁被释放后其他等待的线程才能被正常运行


counter = 0

t1 = Thread(target=add_)
t2 = Thread(target=sub_)

t1.start()
t2.start()

t1.join()
t2.join()

print(counter)
0
  • 可以看到此时counter的最终结果为 0 0 0
使用上下文管理器管理锁
  • 在上述代码中,我们手动申请锁与释放锁,RLock()对象支持上下文管理协议,可以使用with语句帮助我们申请和释放锁
from threading import Thread, RLock

lock_obj = RLock()


def add_():
    global counter

    for i in range(1000000):
        with lock_obj:
            counter += 1


def sub_():
    global counter

    for i in range(1000000):
        with lock_obj:
            counter -= 1


counter = 0

t1 = Thread(target=add_)
t2 = Thread(target=sub_)

t1.start()
t2.start()

t1.join()
t2.join()

print(counter)
0
RLock和Lock
  • 在线程中一般使用两种锁机制:RLockLock
Lock
  • Lock是同步锁,不支持锁嵌套,一般很少使用
from threading import Thread, Lock

lock_obj = Lock()


def add_():
    global counter

    for i in range(1000000):
        lock_obj.acquire()  # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
        lock_obj.acquire()

        counter += 1

        lock_obj.release()  # 释放锁, 当锁被释放后其他等待的线程才能被正常运行
        lock_obj.release()


def sub_():
    global counter

    for i in range(1000000):
        lock_obj.acquire()  # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
        lock_obj.acquire()

        counter -= 1

        lock_obj.release()  # 释放锁, 当锁被释放后其他等待的线程才能被正常运行
        lock_obj.release()


counter = 0

t1 = Thread(target=add_)
t2 = Thread(target=sub_)

t1.start()
t2.start()

t1.join()
t2.join()

print(counter)

  • 当嵌套使用同步锁Lock时,产生了死锁,程序会卡死
RLock
  • RLock是递归锁,支持锁嵌套
from threading import Thread, RLock

lock_obj = RLock()


def add_():
    global counter

    for i in range(1000000):
        lock_obj.acquire()  # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
        lock_obj.acquire()

        counter += 1

        lock_obj.release()  # 释放锁, 当锁被释放后其他等待的线程才能被正常运行
        lock_obj.release()


def sub_():
    global counter

    for i in range(1000000):
        lock_obj.acquire()  # 申请锁, 申请成功会让其他线程等待直到当前线程释放锁
        lock_obj.acquire()

        counter -= 1

        lock_obj.release()  # 释放锁, 当锁被释放后其他等待的线程才能被正常运行
        lock_obj.release()


counter = 0

t1 = Thread(target=add_)
t2 = Thread(target=sub_)

t1.start()
t2.start()

t1.join()
t2.join()

print(counter)
0
内置类型底层锁机制
  • 在多线程编程中,不是任何地方都需要加锁处理,在一些内置类型(例如列表)底层已经进行了加锁
import threading

data_list = list()


def list_add():
    for _ in range(100000):
        data_list.append(0)

    print(len(data_list))


for _ in range(2):
    t = threading.Thread(target=list_add)

    t.start()
100000
200000
  • 可以看到没有进行加锁,两个线程仍然正确向列表分别插入了 100000 100000 100000个元素
死锁
  • 在使用锁的过程中,发现如果嵌套使用同步锁,程序会卡死,实际上此时产生了死锁
  • 死锁是由于资源竞争而造成的一种堵塞现象
  • 线程 1 1 1持有锁 1 1 1而等待锁 2 2 2的释放,线程 2 2 2持有锁 2 2 2而等待锁 1 1 1的释放,此时产生了循环等待,产生了死锁
示例
import threading
import time

mutex_1 = threading.Lock()
mutex_2 = threading.Lock()


class MyThread_1(threading.Thread):
    def run(self):
        mutex_1.acquire()  # 对 mutex_1 上锁
        print(self.name + '---mutex_1-up---')

        time.sleep(1)  # mutex_1 上锁后, 延时 1 秒, 等待线程 2 对 mutex_2 上锁

        mutex_2.acquire()  # 此时会堵塞, 因为 mutex_2 已经被线程 2 抢先上锁了
        print(self.name + '---mutex_2-up---')

        mutex_2.release()  # 对 mutex_2 解锁
        mutex_1.release()  # 对 mutex_1 解锁


class MyThread_2(threading.Thread):
    def run(self):
        mutex_2.acquire()  # 对 mutex_2 上锁
        print(self.name + '---mutex_2-up---')

        time.sleep(1)  # mutex_2 上锁后, 延时 1 秒, 等待线程 1 对 mutex_1 上锁

        mutex_1.acquire()  # 此时会堵塞, 因为 mutex_1 已经被线程 1 抢先上锁了
        print(self.name + '---mutex_1-up---')

        mutex_1.release()  # 对 mutex_1 解锁
        mutex_2.release()  # 对 mutex_2 解锁


if __name__ == '__main__':
    t1 = MyThread_1()
    t2 = MyThread_2()

    t1.start()
    t2.start()
Thread-1---mutex_1-up---
Thread-2---mutex_2-up---
  • 由于产生了死锁,程序会卡死,不会终止

线程池

  • 线程对象的创建需要时间,在需要创建大量线程对象的时候会发生性能下降的情况
  • 使用线程池会创建出一定数量的线程对象,并且线程在执行完任务后不会被解释器销毁,这样下一个任务可以重复使用之前创建的这些线程对象
线程池的创建
import time
from concurrent.futures import ThreadPoolExecutor


def get_html(page):
    print(f'Successfully obtained page {page}...')

    time.sleep(1)

    return page


# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)

# 通过 submit 提交需要执行的函数到线程池中
task_1 = executor.submit(get_html, 1)
task_2 = executor.submit(get_html, 2)
Successfully obtained page 1...
Successfully obtained page 2...
done()方法
  • done()方法用于判断某个任务是否完成
import time
from concurrent.futures import ThreadPoolExecutor


def get_html(page):
    print(f'Successfully obtained page {page}...')

    time.sleep(1)

    return page


# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)

# 通过 submit 提交需要执行的函数到线程池中
task_1 = executor.submit(get_html, 1)
task_2 = executor.submit(get_html, 2)

# done() 方法用于判断某个任务是否完成
print(f'task_1 完成情况: {task_1.done()}')
Successfully obtained page 1...
Successfully obtained page 2...
task_1 完成情况: False
cancel()方法
  • cancel()方法用于取消未运行的任务,已经运行的任务无法被取消
import time
from concurrent.futures import ThreadPoolExecutor


def get_html(page):
    print(f'Successfully obtained page {page}...')

    time.sleep(1)

    return page


# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=1)

# 通过 submit 提交需要执行的函数到线程池中
task_1 = executor.submit(get_html, 1)
task_2 = executor.submit(get_html, 2)

# cancel() 方法用于取消未运行的任务, 已经运行的任务无法被取消
print(f'task_2 任务取消: {task_2.cancel()}')
Successfully obtained page 1...
task_2 任务取消: True
  • 通过将max_workers的值修改为 1 1 1,使得task_2未能运行时就被取消
result()方法
  • submit()方法的返回值是一个future对象
  • 通过对future对象调用result()方法可以获取任务的返回值
import time
from concurrent.futures import ThreadPoolExecutor


def get_html(page):
    print(f'Successfully obtained page {page}...')

    time.sleep(1)

    return page


# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)

# 通过 submit 提交需要执行的函数到线程池中
task_1 = executor.submit(get_html, 1)
task_2 = executor.submit(get_html, 2)

# 通过对 future 对象调用 result() 方法获取任务的返回值
print(f'task_1 返回结果: {task_1.result()}')
print(f'task_2 返回结果: {task_2.result()}')
Successfully obtained page 1...
Successfully obtained page 2...
task_1 返回结果: 1
task_2 返回结果: 2
as_completed()方法
  • as_completed()方法用于获取已经执行成功的任务的返回值
import time
from concurrent.futures import ThreadPoolExecutor, as_completed


def get_html(page):
    print(f'Successfully obtained page {page}...')

    time.sleep(1)

    return page


# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)

page_list = [1, 2, 3, 4]

# 批量提交任务并获取已经执行成功的任务的返回值
all_tasks = [executor.submit(get_html, page) for page in page_list]

# 只要线程任务执行完就能获取到返回值, 完成一个任务获取一个任务的返回值
for future in as_completed(all_tasks):
    data = future.result()

    print(f'Get data {data}')
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Get data 1
Successfully obtained page 4...
Get data 2
Get data 4
Get data 3
map()方法
  • map()方法用于提交任务并获取已经执行成功的任务的返回值
import time
from concurrent.futures import ThreadPoolExecutor


def get_html(page):
    print(f'Successfully obtained page {page}...')

    time.sleep(1)

    return page


# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)

page_list = [1, 2, 3, 4]

# map() 方法用于提交任务并获取已经执行成功的任务的返回值
for data in executor.map(get_html, page_list):
    print(f'Get data {data}')  # 打印的返回值顺序与列表顺序一致
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Successfully obtained page 4...
Get data 1
Get data 2
Get data 3
Get data 4
wait()方法
  • wait()方法用于使主线程堵塞,直到指定任务完成后,主线程才解堵塞
import time
from concurrent.futures import ThreadPoolExecutor, wait


def get_html(page):
    print(f'Successfully obtained page {page}...')

    time.sleep(1)

    return page


# 创建线程池对象
executor = ThreadPoolExecutor(max_workers=2)

page_list = [1, 2, 3, 4]

# 批量提交任务并获取已经执行成功的任务的返回值
all_tasks = [executor.submit(get_html, page) for page in page_list]

# wait() 方法用于使主线程堵塞, 直到指定任务完成后, 主线程才解堵塞
wait(all_tasks)

print('主线程解堵塞, 执行剩余代码...')
Successfully obtained page 1...
Successfully obtained page 2...
Successfully obtained page 3...
Successfully obtained page 4...
主线程解堵塞, 执行剩余代码...

自定义线程类

  • 自定义线程类需要继承Thread
  • 需要重写Thread类中的run()方法,用于运行线程任务
示例
import requests
import threading


class ThreadSpider(threading.Thread):
    def __init__(self, url):
        super().__init__()

        self.url = url

    def run(self):
        response = requests.get(self.url).content

        file_name = self.url.split('/')[-1]
        with open(file_name, 'wb') as f:
            f.write(response)

        print('下载完成...')


url_list = [
    'http://pic.bizhi360.com/bbpic/98/10798.jpg',
    'http://pic.bizhi360.com/bbpic/92/10792.jpg',
    'http://pic.bizhi360.com/bbpic/86/10386.jpg'
]

for url in url_list:
    thread_spider = ThreadSpider(url)

    thread_spider.start()
下载完成...
下载完成...
下载完成...

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐