使用案例:使用协程做离散事件仿真

本节我会说明如何只使用协程和标准库中的对象实现一个特别简单的仿
真系统。在计算机科学领域,仿真是协程的经典应用。第一门面向对象
的语言 Simula 引入了协程这个概念,目的就是为了支持仿真。

下述仿真示例不是为了做学术研究。协程是 asyncio 包的基
础构建。通过仿真系统能说明如何使用协程代替线程实现并发的活
动,而且对理解第 18 章讨论的 asyncio 包有极大的帮助。

分析示例之前,先简单介绍一下仿真。

离散事件仿真简介

离散事件仿真(Discrete Event Simulation,DES)是一种把系统建模成一
系列事件的仿真类型。在离散事件仿真中,仿真“钟”向前推进的量不是
固定的,而是直接推进到下一个事件模型的模拟时间。假如我们抽象模
拟出租车的运营过程,其中一个事件是乘客上车,下一个事件则是乘客
下车。不管乘客坐了 5 分钟还是 50 分钟,一旦乘客下车,仿真钟就会
更新,指向此次运营的结束时间。使用离散事件仿真可以在不到一秒钟
的时间内模拟一年的出租车运营过程。这与连续仿真不同,连续仿真的
仿真钟以固定的量(通常很小)不断向前推进。

显然,回合制游戏就是离散事件仿真的例子:游戏的状态只在玩家操作
时变化,而且一旦玩家决定下一步怎么走了,仿真钟就会冻结。而实时游戏则是连续仿真,仿真钟一直在运行,游戏的状态在一秒钟之内更新
很多次,因此反应慢的玩家特别吃亏。

这两种仿真类型都能使用多线程或在单个线程中使用面向事件的编程技
术(例如事件循环驱动的回调或协程)实现。可以说,为了实现连续仿
真,在多个线程中处理实时并行的操作更自然。而协程恰好为实现离散
事件仿真提供了合理的抽象。SimPy 是一个实现离散事件仿真的
Python 包,通过一个协程表示离散事件仿真系统中的各个进程。

在仿真领域,进程这个术语指代模型中某个实体的活动,与
操作系统中的进程无关。仿真系统中的一个进程可以使用操作系统
中的一个进程实现,但是通常会使用一个线程或一个协程实现。

如果对仿真感兴趣,值得研究一下 SimPy。不过,在这一节我会说明如
何只使用标准库提供的功能实现一个特别简单的离散事件仿真系统。我
的目的是增进你对使用协程管理并发操作的感性认知。若想理解下一节
所讲的内容,要仔细研究,不过这一付出能得到很大回报,让我们洞悉
asyncio、Twisted 和 Tornado 等库是如何在单个线程中管理多个并发活
动的。

出租车队运营仿真

仿真程序 taxi_sim.py 会创建几辆出租车,每辆车会拉几个乘客,然后回
家。出租车首先驶离车库,四处徘徊,寻找乘客;拉到乘客后,行程开
始;乘客下车后,继续四处徘徊。

四处徘徊和行程所用的时间使用指数分布生成。为了让显示的信息更加
整洁,时间使用取整的分钟数,不过这个仿真程序也能使用浮点数表示
耗时。 每辆出租车每次的状态变化都是一个事件。图 16-3 是运行这
个程序的输出示例。

运行 taxi_sim.py 创建 3 辆出租车的输出示例。-s 3 参数设
置随机数生成器的种子,这样在调试和演示时可以重复运行程序,
输出相同的结果。不同颜色的箭头表示不同出租车的行程

图 16-3 中最值得注意的一件事是,3 辆出租车的行程是交叉进行的。那
些箭头是我加上的,为的是让你看清各辆出租车的行程:箭头从乘客上车时开始,到乘客下车后结束。有了箭头,能直观地看出如何使用协程
管理并发的活动。

图 16-3 中还有几件事值得注意。

  • 出租车每隔 5 分钟从车库中出发。
  • 0 号出租车 2 分钟后拉到乘客(time=2),1 号出租车 3 分钟后拉
    到乘客(time=8),2 号出租车 5 分钟后拉到乘客(time=15)。
  • 0 号出租车拉了两个乘客(紫色箭头):第一个乘客从 time=2 时
  • 上车,到 time=18 时下车;第二个乘客从 time=28 时上车,到
    time=65 时下车——这是此次仿真中最长的行程。
  • 1 号出租车拉了四个乘客(绿色箭头),在 time=110 时回家。
  • 2 号出租车拉了六个乘客(红色箭头),在 time=109 时回家。这
    辆车最后一次行程从 time=97 时开始,只持续了一分钟。
  • 1 号出租车的第一次行程从 time=8 时开始,在这个过程中 2 号出
    租车离开了车库(time=10),而且完成了两次行程(那两个短的
    红色箭头)。
  • 在此次运行示例中,所有排定的事件都在默认的仿真时间内(180
    分钟)完成;最后一次事件发生在 time=110 时。

仿真结束时可能还有未完成的事件。如果是这种情况,最后一条消息会
是下面这样:

*** end of simulation time: 3 events pending ***

taxi_sim.py 脚本的完整代码在示例 A-6 中,本章只会列出与协程相关的
部分。真正重要的函数只有两个:taxi_process(一个协程),以及
执行仿真主循环的 Simulator.run 方法。

示例 16-20 是 taxi_process 函数的代码。这个协程用到了别处定义的
两个对象:compute_delay 函数,返回单位为分钟的时间间隔;Event
类,一个 namedtuple,定义方式如下:

Event = collections.namedtuple('Event', 'time proc action')

在 Event 实例中,time 字段是事件发生时的仿真时间,proc 字段是出
租车进程实例的编号,action 字段是描述活动的字符串。

下面逐行分析示例 16-20 中的 taxi_process 函数。
示例 16-20 taxi_sim.py:taxi_process 协程,实现各辆出租车的
活动

def taxi_process(ident, trips, start_time=0):"""每次改变状态时创建事件,把控制权让给仿真器"""
  time = yield Event(start_time, ident, 'leave garage')for i in range(trips): ➌
    time = yield Event(time, ident, 'pick up passenger') ➍
    time = yield Event(time, ident, 'drop off passenger')yield Event(time, ident, 'going home')# 出租车进程结束 ➐

❶ 每辆出租车调用一次 taxi_process 函数,创建一个生成器对象,
表示各辆出租车的运营过程。ident 是出租车的编号(如上述运行示例
中的 0、1、2);trips 是出租车回家之前的行程数量;start_time
是出租车离开车库的时间。

❷ 产出的第一个 Event 是 ‘leave garage’。执行到这一行时,协程
会暂停,让仿真主循环着手处理排定的下一个事件。需要重新激活这个
进程时,主循环会发送(使用 send 方法)当前的仿真时间,赋值给
time。

❸ 每次行程都会执行一遍这个代码块。
❹ 产出一个 Event 实例,表示拉到乘客了。协程在这里暂停。需要重新激活这个协程时,主循环会发送(使用 send 方法)当前的时间。
❺ 产出一个 Event 实例,表示乘客下车了。协程在这里暂停,等待主
循环发送时间,然后重新激活。
❻ 指定的行程数量完成后,for 循环结束,最后产出 ‘going home’
事件。此时,协程最后一次暂停。仿真主循环发送时间后,协程重新激
活;不过,这里没有把产出的值赋值给变量,因为用不到了。
❼ 协程执行到最后时,生成器对象抛出 StopIteration 异常。
你可以在 Python 控制台中调用 taxi_process 函数,自己“驾
驶”(drive)一辆出租车 ,如示例 16-21 所示。

示例 16-21 驱动 taxi_process 协程

>>> from taxi_sim import taxi_process
>>> taxi = taxi_process(ident=13, trips=2, start_time=0)>>> next(taxi) ➋
Event(time=0, proc=13, action='leave garage')
>>> taxi.send(_.time + 7) ➌
Event(time=7, proc=13, action='pick up passenger')>>> taxi.send(_.time + 23) ➎
Event(time=30, proc=13, action='drop off passenger')
>>> taxi.send(_.time + 5) ➏
Event(time=35, proc=13, action='pick up passenger')
>>> taxi.send(_.time + 48) ➐
Event(time=83, proc=13, action='drop off passenger')
>>> taxi.send(_.time + 1)
Event(time=84, proc=13, action='going home')>>> taxi.send(_.time + 10) ➒
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

❶ 创建一个生成器对象,表示一辆出租车。这辆出租车的编号是
13(ident=13),从 t=0 时开始工作,有两次行程。
❷ 预激协程;产出第一个事件。
❸ 现在可以发送当前时间。在控制台中,_ 变量绑定的是前一个结果;
这里我在时间上加 7,意思是这辆出租车 7 分钟后找到第一个乘客。
❹ 这个事件由 for 循环在第一个行程的开头产出。
❺ 发送 _.time + 23,表示第一个乘客的行程持续了 23 分钟。
❻ 然后,这辆出租车会徘徊 5 分钟。
❼ 最后一次行程持续 48 分钟。
❽ 两次行程完成后,for 循环结束,产出 ‘going home’ 事件。
❾ 如果尝试再把值发给协程,会执行到协程的末尾。协程返回后,解
释器会抛出 StopIteration 异常。

注意,在示例 16-21 中,我使用控制台模拟仿真主循环。我从 taxi 协
程产出的 Event 实例中获取 .time 属性,随意与一个数相加,然后调
用 taxi.send 方法发送两数之和,重新激活协程。在这个仿真系统
中,各个出租车协程由 Simulator.run 方法中的主循环驱动。仿
真“钟”保存在 sim_time 变量中,每次产出事件时都会更新仿真钟。

为了实例化 Simulator 类,taxi_sim.py 脚本的 main 函数构建了一个
taxis 字典,如下所示:

taxis = {i: taxi_process(i, (i + 1) * 2, i * DEPARTURE_INTERVAL)
for i in range(num_taxis)}
sim = Simulator(taxis)

DEPARTURE_INTERVAL 的值是 5;如果 num_taxis 的值与前面的运行
示例一样也是 3,这三行代码的作用与下述代码一样:

taxis = {0: taxi_process(ident=0, trips=2, start_time=0),
1: taxi_process(ident=1, trips=4, start_time=5),
2: taxi_process(ident=2, trips=6, start_time=10)}
sim = Simulator(taxis)

因此,taxis 字典的值是三个参数不同的生成器对象。例如,1 号出租
车从 start_time=5 时开始,寻找四个乘客。构建 Simulator 实例只
需这个字典参数。

Simulator.init 方法如示例 16-22 所示。Simulator 类的主要数据
结构如下。
self.events
  PriorityQueue 对象,保存 Event 实例。元素可以放进(使用
put 方法)PriorityQueue 对象中,然后按 item[0](即 Event 对象
的 time 属性)依序取出(使用 get 方法)。

**self.procs **
  一个字典,把出租车的编号映射到仿真过程中激活的进程(表示出
租车的生成器对象)。这个属性会绑定前面所示的 taxis 字典副本。

示例 16-22 taxi_sim.py:Simulator 类的初始化方法

class Simulator:
  def __init__(self, procs_map):
    self.events = queue.PriorityQueue() ➊
    self.procs = dict(procs_map)

❶ 保存排定事件的 PriorityQueue 对象,按时间正向排序。
❷ 获取的 procs_map 参数是一个字典(或其他映射),可是又从中构
建一个字典,创建本地副本,因为在仿真过程中,出租车回家后会从
self.procs 属性中移除,而我们不想修改用户传入的对象。

优先队列是离散事件仿真系统的基础构件:创建事件的顺序不定,放入
这种队列之后,可以按照各个事件排定的时间顺序取出。例如,可能会
把下面两个事件放入优先队列:

Event(time=14, proc=0, action='pick up passenger')
Event(time=11, proc=1, action='pick up passenger')

这两个事件的意思是,0 号出租车 14 分钟后拉到第一个乘客,而 1 号
出租车(time=10 时出发)1 分钟后(time=11)拉到乘客。如果这两
个事件在队列中,主循环从优先队列中获取的第一个事件将是
Event(time=11, proc=1, action=‘pick up passenger’)。

下面分析这个仿真系统的主算法——Simulator.run 方法。在 main 函
数中,实例化 Simulator 类之后立即就调用了这个方法,如下所示:

sim = Simulator(taxis)
sim.run(end_time)

Simulator 类带有注解的代码清单在示例 16-23 中,下面先概述
Simulator.run 方法实现的算法。

(1) 迭代表示各辆出租车的进程。
 a. 在各辆出租车上调用 next() 函数,预激协程。这样会产出各辆
出租车的第一个事件。
  b. 把各个事件放入 Simulator 类的 self.events 属性(队列)
中。

(2) 满足 sim_time < end_time 条件时,运行仿真系统的主循环。
  a. 检查 self.events 属性是否为空;如果为空,跳出循环。
  b. 从 self.events 中获取当前事件(current_event),即
PriorityQueue 对象中时间值最小的 Event 对象。
  c. 显示获取的 Event 对象。
  d.获取 current_event 的 time 属性,更新仿真时间。

e.把时间发给 current_event 的 proc 属性标识的协程,产出下一
个事件(next_event)。
  f.把 next_event 添加到 self.events 队列中,排定
next_event。

Simulator 类完整的代码如示例 16-23 所示。
示例 16-23 taxi_sim.py:Simulator,一个简单的离散事件仿真
类;关注的重点是 run 方法

class Simulator:
def __init__(self, procs_map):
  self.events = queue.PriorityQueue()
  self.procs = dict(procs_map)
def run(self, end_time):"""排定并显示事件,直到时间结束"""
  # 排定各辆出租车的第一个事件
  for _, proc in sorted(self.procs.items()): ➋
    first_event = next(proc) ➌
    self.events.put(first_event)# 这个仿真系统的主循环
    sim_time = 0while sim_time < end_time:if self.events.empty():print('*** end of events ***')
        break
      current_event = self.events.get() ➑
      sim_time, proc_id, previous_action = current_event ➒
      print('taxi:', proc_id, proc_id * ' ', current_event) ➓
      active_proc = self.procs[proc_id] ⓫
      next_time = sim_time + compute_duration(previous_action)try:
          next_event = active_proc.send(next_time)except StopIteration:
        del self.procs[proc_id]else:
      self.events.put(next_event)else: ⓰
    msg = '*** end of simulation time: {} events pending ***'
    print(msg.format(self.events.qsize()))

❶ run 方法只需要仿真结束时间(end_time)这一个参数。
❷ 使用 sorted 函数获取 self.procs 中按键排序的元素;用不到键,
因此赋值给 _。
❸ 调用 next(proc) 预激各个协程,向前执行到第一个 yield 表达
式,做好接收数据的准备。产出一个 Event 对象。
❹ 把各个事件添加到 self.events 属性表示的 PriorityQueue 对象
中。如示例 16-20 中的运行示例,各辆出租车的第一个事件是 ‘leave
garage’。
❺ 把 sim_time 变量(仿真钟)归零。
❻ 这个仿真系统的主循环:sim_time 小于 end_time 时运行。
❼ 如果队列中没有未完成的事件,退出主循环。
❽ 获取优先队列中 time 属性最小的 Event 对象;这是当前事件
(current_event)。
❾ 拆包 Event 对象中的数据。这一行代码会更新仿真钟 sim_time,对
应于事件发生时的时间。

❿ 显示 Event 对象,指明是哪辆出租车,并根据出租车的编号缩进。
⓫ 从 self.procs 字典中获取表示当前活动的出租车的协程。
⓬ 调用 compute_duration(…) 函数,传入前一个动作(例
如,‘pick up passenger’、‘drop off passenger’ 等),把结果
加到 sim_time 上,计算出下一次活动的时间。

⓭ 把计算得到的时间发给出租车协程。协程会产出下一个事件
(next_event),或者抛出 StopIteration 异常(完成时)。
⓮ 如果抛出了 StopIteration 异常,从 self.procs 字典中删除那个
协程。
⓯ 否则,把 next_event 放入队列中。
⓰ 如果循环由于仿真时间到了而退出,显示待完成的事件数量(有时
可能碰巧是零)。

注意,示例 16-23 中的 Simulator.run 方法有两处用到了第 15 章介绍
的 else 块,而且都不在 if 语句中。

  • 主 while 循环有一个 else 语句,报告仿真系统由于到达结束时间
    而结束,而不是由于没有事件要处理而结束。
  • 靠近主 while 循环底部那个 try 语句把 next_time 发给当前的出
    租车进程,尝试获取下一个事件(next_event),如果成功,执
    行 else 块,把 next_event 放入 self.events 队列中。
    我觉得,如果没有这两个 else 块,Simulator.run 方法的代码会有点
    难以阅读。

这个示例的要旨是说明如何在一个主循环中处理事件,以及如何通过发
送数据驱动协程。这是 asyncio 包底层的基本思想,我们在第 18 章会
学习这个包。

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐