线程:概念和实现(3)
2020-02-18
翻译:老齐
译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》
第三部分
生产者-消费者线程
生产者-消费者问题(Producer-Consumer Problem,以下简称:PCP)是计算机科学中研究线程或进程同步的代表性问题,下面要通过它的一个变体来了解Python中threading
模块提供的各种方法。
对于本例,你将想象一个程序需要从网络读取信息并将其写入磁盘。程序会确定是否要请求信息。它必须监听并接受信息,这些信息不会以正常的速度传入,而是会以突发的方式传入。程序的这一部分叫做生产者。
另一方面,一旦收到信息,你就需要将其写入数据库。数据库访问速度很慢,但这个速度足以跟上信息传输的平均速度。当一大堆信息进来时,访问速度还不够快。这部分是消费者。
在生产者和消费者之间,创建一个Pipeline
,它将随着你对不同的同步对象的了解而变化。
这是基本的布局。让我们看看使用Lock
的解决方案。它并不完美,但它使用的工具是你已经知道的,所以这是一个很好的开始。
使用锁的PCP
因为这是一篇关于Python的threading
模块的文章,而且你刚刚阅读了Lock
的使用方法,,所以让我们尝试用一两个使用Lock
的线程来解决这个问题。
一般的设计是,有一个producer
线程从模拟网络读取消息并将信息放入Pipeline
:
1 | import random |
要生成模拟信息,producer
中会生成一个介于1和101(不含101)之间的随机整数,然后调用pipeline
的.set_message()
,将其发送到consumer
。
producer
还使用SENTINEL
值作为标记,当向consumer
发送了10个值,就停止发送。这有点尴尬,但不要担心,在完成这个示例之后,你将看到消除这个SENTINEL
值的方法。
在pipeline
的另一边是消费者:
1 | def consumer(pipeline): |
consumer
从pipeline
中读取一条信息并将其写入一个虚拟数据库,在本例中,只是将信息打印到显示器上。如果它得到SENTINEL
值,就结束函数执行过程,该函数将终止线程。
在看真正有趣Pipeline
部分之前,这里是__main__
的代码,它产生了以下线程:
1 | if __name__ == "__main__": |
这看起来应该相当熟悉,因为它接近前面示例中的__main__
代码。
请记住,你可以通过取消注释行打开DEBUG
日志记录,以查看所有日志记录消息:
1 | # logging.getLogger().setLevel(logging.DEBUG) |
通过DEBUG
日志信息来查看每个线程获取和释放锁的确切位置是值得的。
现在让我们看看将信息从producer
传递给消费者的管道:
1 | class Pipeline: |
哇!这么多代码。其中相当大的一部分只是日志语句,以便在运行代码时更容易看到发生了什么。下面是删除所有日志记录语句后的相同代码:
1 | class Pipeline: |
这似乎更容易处理。此版本代码中的Pipeline
有三个成员:
.message
存储要传递的信息。.producer_lock
是threading.Lock
实例对象,在producer
线程中,用它控制对信息的访问.consumer_lock
也是threading.Lock
实例对象,它在consumer
线程控制对信息的访问。
__init__()
初始化这三个成员,然后调用.consumer_lock
上的.acquire()
。这是你想开始的状态。允许producer
添加新信息,但consumer
需要等待信息出现。
.get_message()
和 .set_messages()
几乎相反。.get_message()
调用consumer_lock
上的.acquire()
,它让consumer
等待信息准备就绪。
一旦consumer
获得了.consumer_lock
,它就会复制出.message
中的值,然后调用.producer_lock
上的.release()
,释放锁,允许producer
将下一条信息插入到pipeline
中。
在运行.set_message()
之前,要注意.get_message()
中的一个细节,通常以return self.message
结束方法,但是此处不这样做,看看你能否弄清楚原因。
答案在此。一旦consumer
调用.producer_lock.release()
,它就会与producer
交换位置,producer
开始运行,这种情况可能在.release()
返回之前发生!这意味着,当函数returns self.message
时,有比较小的概率会生成下一条信息,因此你将丢失第一条信息。这是另一个竞态的例子。
转到.set_message()
,可以看到事务的另一面,producer
会用一条信息来调用它,获取.producer_lock
,设置.message
,然后调用consumer_lock
上的.release()
。这样就使得用户可以读取该值。
将日志设置为WARNING
并执行代码,看看它是什么样子的:
1 | $ ./prodcom_lock.py |
一开始,你可能会发现奇怪的是,producer
在consumer
运行之前就收到两条信息。如果回顾一下producer
和.set_message()
,你会注意到,当producer
视图将信息发送到pipeline
时,会等待Lock
。这是在producer
收到信息和日志之后完成的。
当producer
尝试发送第二条信息时,它将第二次调用.set_message()
,并且它将被锁定。
操作系统可以在任何时候交换线程,但它通常会让每个线程在交换之前有一个合理的运行时间。这就是为什么producer
通常运行到它在第二次调用.set_message()
时被锁定为止。
但是,一旦某个线程被锁定,操作系统就会将其交换出去,并找到另一个要运行的线程,此时的另一个线程就是consumer
。
consumer
调用.get_message()
,该函数读取信息并调用.producer_lock
上的.release()
,从而允许producer
在下次交换线程时再次运行。
注意,第一条消息是43,这正是consumer
读的内容,尽管 producer
已经生成了45这条信息。
以上是有限的测试,并没有很好地解决PCP,因为它一次只允许管道中的有一个值。当producer
收到大量信息时,它将无处安放这些信息。
让我们使用Queue
寻找一个更好的方法来解决这个问题。
(未完待续)
关注微信公众号:老齐教室。读深度文章,得精湛技艺,享绚丽人生。
若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏
关注微信公众号,读文章、听课程,提升技能