线程:概念和实现(4)
2020-02-24
翻译:老齐
译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》
第四部分
将队列应用于PCP
如果你希望一次能够处理管道中的多个值,就需要一种针对管道的数据结构,它相当于producer
的备份,能实现数量增加和减少。
Python标准库有一个queue
模块,该模块有一个Queue
类,下面将Pipeline
改为Queue
,就可以不再使用Lock
锁定某些变量,此外,还将使用Python的threading
模块中的Event
来停止工作线程,这是一种与以往不同的方法。
从Event
开始。当有很多线程等待threading.Event
实例的时候,它能够将一个线程标记为一个事件。这段代码的关键是,等待事件的线程不一定需要停止它们正在做的事情,它们可以每隔一段时间检查一次Event
的状态。
很多事情都可以触发event
。在本例中,主线程将简单地休眠一段时间,然后运行.set()
:
1 | if __name__ == "__main__": |
这里唯一的变化是创建了event
对象,然后将event
作为参数传给后面的.submit
方法,在with语句中,有一句要sleep一秒钟,再记录日志信息,最后调用event.set()
。
producer
也不需要改变太多:
1 | def producer(pipeline, event): |
while
循环中不再为pipeline
设置SENTINEL
值。consumer
需要相应做较大改动:
1 | def consumer(pipeline, event): |
必须删除SENTINEL
值相关的代码,while
循环的条件也因此更复杂了一些,现在需要考虑not event.is_set()
和not pipeline.empty()
两个条件,也就是未设置event
,或者pipeline
未清空时。
要确保在consumer
进程结束是队列中已经是空的了,否则就会出现以下两种糟糕的情况。一是丢失了这些最终消息,但更严重的情况是第二种,producer
如果视图将信息添加到完整队列中,会被锁住,从而不能返回。这种事件会发生在producer
验证.is_set()
条件之后,调用pipeline.set_message()
之前。
这种事件会发生在producer
验证.is_set()
条件之后,调用pipeline.set_message()
之前。
如果发生这种情况,producer
可能会在队列仍然全满的情况下唤醒并退出。然后,调用.set_message()
,.set_message()
将一直等到队列中有新信息的空间。若consumer
已经退出,这种情况就不会发生,而且producer
不会退出。
consumer
中的其他部分看起来应该很熟悉。
然而,Pipeline
还需要重写:
1 | class Pipeline(queue.Queue): |
上面的Pipeline
是queue.Queue
的子类。Queue
在初始化时指定一个可选参数,以指定队列的最大长度。
如果为maxsize
指定一个正数,则该数字为队列元素个数的极限,如果达到该值,.put()
方法被锁定,直到元素的数量少于maxsize
才解锁。如果不指定maxsize
,则队列将增长到计算机内存的所许可的最值。
.get_message()
和.set_message()
两个方法代码更少了,它们基本上把.get()
和.put()
封装在Queue
中。你可能想知道防止线程发生竞态条件的锁都去了哪里。
编写标准库的核心开发人员知道,Queue
经常在多线程环境中使用,于是将锁合并到Queue
本身中。Queue
对于线程来说是安全的。
此程序的运行如下所示:
1 | $ ./prodcom_queue.py |
通读上述示例的输出,会发现,有的地方很有意思。在顶部,你可以看到producer
必须创建5条信息并将其中4条放在队列中,队列中最前面的一条被操作系统换掉之后,第5条条信息才能加入队列。
然后consumer
运行,把第1条信息拉了出来,它打印出了该信息以及队列在此时的长度:
1 | Consumer storing message: 32 (queue size=3) |
此时,标明第5条信息还没有进入pipeline
,删除单个信息后queue
的减小到3。你也知道queue
可以保存10条消息,因此queue
线程不会被queue
阻塞,它被操作系统置换了。
注意:你调试的输出结果会有所不同。你的输出将随着运行次数的不同而改变。这就是用线程工作的乐趣所在!
执行代码,你能看到主线程生成event事件,这会导致producer
立即退出,consumer
还有很多工作要做,所以它会一直运行,直到清理完pipeline
。
尝试操作大小不同的队列,并调用producer
或consumer
中的time.sleep()
,以分别模拟更长的网络或磁盘访问时间。即使对程序的这些内容稍加更改,也会使结果产生很大差异。
这是解决发PCP的一个好方法,但是你可以进一步简化它,不需要使用Pipeline
,一旦去掉日志记录,它就会变成一个queue.Queue
。
下面是直接使用queue.Queue
的最终代码:
1 | import concurrent.futures |
这更易于阅读,并展示了如何使用Python的内置模块来简化复杂的问题。
Lock
和 Queue
是便于解决并发问题的类,但标准库还提供了其他类。在结束本文之前,让我们浏览其中一些类。
Threading
Python的threading
模块还提供了一些类,虽然上面的示例不需要这些,但是它们在不同的用例中可以派上用场,所以熟悉它们是有好处的。
Semaphore
threading.Semaphore
有一些特殊属性的计数器对象,这里实现的计数具有原子性,意味着可以保证操作系统不会在递增或递减计数器的过程中交换线程。
内部计数器在调用.release()
时递增,在调用.acquire()
时递减。
另外一个特殊属性,如果一个线程在计数器为零时调用.acquire()
,则该线程将被锁定,直到另一个线程调用.release()
,并将计数器增加到1。
Semaphores
通常用于保护容量有限的资源。例如,如果你有一个连接池,并且希望将该池的大小限制为特定的数目。
Timer
threading.Timer
用于在经过一定时间后调度要调用的函数,你可以通过传入等待的秒数和调用的函数来创建Timer
实例:
1 | t = threading.Timer(30.0, my_function) |
通过调用.start()
启动Timer
。在指定时间之后的某个时间点,将在新线程上调用该函数。但请注意,无法保证会在你希望的时间准确调用该函数。
如果要停止已经启动的Timer
,可以调用.cancel()
。如果在Timer
触发后调用.cancel()
,不会执行任何操作,也不会产生异常。
Timer
可用于在特定时间后提示用户执行操作。如果用户在Timer
过期之前执行操作,则可以调用.cancel()
。
Barrier
threading.Barrier
可用于保持固定数量的线程同步。创建Barrier
时,调用方必须指定将要同步的线程数。每个线程都调用Barrier
的.wait()
方法,它们都将保持封锁状态,直到指定数量的线程在等待,然后全部同时释放。
请记住:线程是由操作系统调度的,因此,即使所有线程都是同时释放的,它们也将被调度为一次运行一个线程。
Barrier
的一个用途是允许线程池对自身进行初始化。让这些线程初始化后在Barrier
上等待,将确保在所有线程完成初始化之前,没有一个线程开始运行。
结论:Python中的线程
现在你已经了解了Python的threading
提供的许多功能,以及一些如何写线程程序和用线程程序解决问题的示例。你还看到了在编写和调试线程程序时出现的一些问题。
原文链接:https://realpython.com/intro-to-python-threading/
关注微信公众号:老齐教室。读深度文章,得精湛技艺,享绚丽人生。
若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏
关注微信公众号,读文章、听课程,提升技能