老齐教室

线程:概念和实现(3)

翻译:老齐

译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》


第三部分

生产者-消费者线程

生产者-消费者问题(Producer-Consumer Problem,以下简称:PCP)是计算机科学中研究线程或进程同步的代表性问题,下面要通过它的一个变体来了解Python中threading模块提供的各种方法。

对于本例,你将想象一个程序需要从网络读取信息并将其写入磁盘。程序会确定是否要请求信息。它必须监听并接受信息,这些信息不会以正常的速度传入,而是会以突发的方式传入。程序的这一部分叫做生产者。

另一方面,一旦收到信息,你就需要将其写入数据库。数据库访问速度很慢,但这个速度足以跟上信息传输的平均速度。当一大堆信息进来时,访问速度还不够快。这部分是消费者。

在生产者和消费者之间,创建一个Pipeline,它将随着你对不同的同步对象的了解而变化。

这是基本的布局。让我们看看使用Lock的解决方案。它并不完美,但它使用的工具是你已经知道的,所以这是一个很好的开始。

使用锁的PCP

因为这是一篇关于Python的threading模块的文章,而且你刚刚阅读了Lock的使用方法,,所以让我们尝试用一两个使用Lock的线程来解决这个问题。

一般的设计是,有一个producer线程从模拟网络读取消息并将信息放入Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
import random 

SENTINEL = object()

def producer(pipeline):
"""Pretend we're getting a message from the network."""
for index in range(10):
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
pipeline.set_message(message, "Producer")

# Send a sentinel message to tell consumer we're done
pipeline.set_message(SENTINEL, "Producer")

要生成模拟信息,producer中会生成一个介于1和101(不含101)之间的随机整数,然后调用pipeline.set_message(),将其发送到consumer

producer还使用SENTINEL值作为标记,当向consumer发送了10个值,就停止发送。这有点尴尬,但不要担心,在完成这个示例之后,你将看到消除这个SENTINEL值的方法。

pipeline的另一边是消费者:

1
2
3
4
5
6
7
def consumer(pipeline):
"""Pretend we're saving a number in the database."""
message = 0
while message is not SENTINEL:
message = pipeline.get_message("Consumer")
if message is not SENTINEL:
logging.info("Consumer storing message: %s", message)

consumerpipeline中读取一条信息并将其写入一个虚拟数据库,在本例中,只是将信息打印到显示器上。如果它得到SENTINEL值,就结束函数执行过程,该函数将终止线程。

在看真正有趣Pipeline部分之前,这里是__main__的代码,它产生了以下线程:

1
2
3
4
5
6
7
8
9
10
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
# logging.getLogger().setLevel(logging.DEBUG)

pipeline = Pipeline()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline)
executor.submit(consumer, pipeline)

这看起来应该相当熟悉,因为它接近前面示例中的__main__代码。

请记住,你可以通过取消注释行打开DEBUG日志记录,以查看所有日志记录消息:

1
# logging.getLogger().setLevel(logging.DEBUG)

通过DEBUG日志信息来查看每个线程获取和释放锁的确切位置是值得的。

现在让我们看看将信息从producer传递给消费者的管道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Pipeline:
"""
Class to allow a single element pipeline between producer and consumer.
"""
def __init__(self):
self.message = 0
self.producer_lock = threading.Lock()
self.consumer_lock = threading.Lock()
self.consumer_lock.acquire()

def get_message(self, name):
logging.debug("%s:about to acquire getlock", name)
self.consumer_lock.acquire()
logging.debug("%s:have getlock", name)
message = self.message
logging.debug("%s:about to release setlock", name)
self.producer_lock.release()
logging.debug("%s:setlock released", name)
return message

def set_message(self, message, name):
logging.debug("%s:about to acquire setlock", name)
self.producer_lock.acquire()
logging.debug("%s:have setlock", name)
self.message = message
logging.debug("%s:about to release getlock", name)
self.consumer_lock.release()
logging.debug("%s:getlock released", name)

哇!这么多代码。其中相当大的一部分只是日志语句,以便在运行代码时更容易看到发生了什么。下面是删除所有日志记录语句后的相同代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Pipeline:
"""
Class to allow a single element pipeline between producer and consumer.
"""
def __init__(self):
self.message = 0
self.producer_lock = threading.Lock()
self.consumer_lock = threading.Lock()
self.consumer_lock.acquire()

def get_message(self, name):
self.consumer_lock.acquire()
message = self.message
self.producer_lock.release()
return message

def set_message(self, message, name):
self.producer_lock.acquire()
self.message = message
self.consumer_lock.release()

这似乎更容易处理。此版本代码中的Pipeline有三个成员:

  • .message存储要传递的信息。
  • .producer_lockthreading.Lock实例对象,在producer线程中,用它控制对信息的访问
  • .consumer_lock也是threading.Lock实例对象,它在consumer线程控制对信息的访问。

__init__()初始化这三个成员,然后调用.consumer_lock上的.acquire()。这是你想开始的状态。允许producer添加新信息,但consumer需要等待信息出现。

.get_message().set_messages()几乎相反。.get_message()调用consumer_lock上的.acquire(),它让consumer等待信息准备就绪。

一旦consumer获得了.consumer_lock,它就会复制出.message中的值,然后调用.producer_lock上的.release(),释放锁,允许producer将下一条信息插入到pipeline中。

在运行.set_message()之前,要注意.get_message()中的一个细节,通常以return self.message结束方法,但是此处不这样做,看看你能否弄清楚原因。

答案在此。一旦consumer调用.producer_lock.release(),它就会与producer交换位置,producer开始运行,这种情况可能在.release()返回之前发生!这意味着,当函数returns self.message时,有比较小的概率会生成下一条信息,因此你将丢失第一条信息。这是另一个竞态的例子。

转到.set_message(),可以看到事务的另一面,producer会用一条信息来调用它,获取.producer_lock,设置.message,然后调用consumer_lock上的.release()。这样就使得用户可以读取该值。

将日志设置为WARNING并执行代码,看看它是什么样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ ./prodcom_lock.py
Producer got data 43
Producer got data 45
Consumer storing data: 43
Producer got data 86
Consumer storing data: 45
Producer got data 40
Consumer storing data: 86
Producer got data 62
Consumer storing data: 40
Producer got data 15
Consumer storing data: 62
Producer got data 16
Consumer storing data: 15
Producer got data 61
Consumer storing data: 16
Producer got data 73
Consumer storing data: 61
Producer got data 22
Consumer storing data: 73
Consumer storing data: 22

一开始,你可能会发现奇怪的是,producerconsumer运行之前就收到两条信息。如果回顾一下producer.set_message(),你会注意到,当producer视图将信息发送到pipeline时,会等待Lock。这是在producer收到信息和日志之后完成的。

producer尝试发送第二条信息时,它将第二次调用.set_message(),并且它将被锁定。

操作系统可以在任何时候交换线程,但它通常会让每个线程在交换之前有一个合理的运行时间。这就是为什么producer通常运行到它在第二次调用.set_message()时被锁定为止。

但是,一旦某个线程被锁定,操作系统就会将其交换出去,并找到另一个要运行的线程,此时的另一个线程就是consumer

consumer调用.get_message(),该函数读取信息并调用.producer_lock上的.release(),从而允许producer在下次交换线程时再次运行。

注意,第一条消息是43,这正是consumer读的内容,尽管 producer已经生成了45这条信息。

以上是有限的测试,并没有很好地解决PCP,因为它一次只允许管道中的有一个值。当producer收到大量信息时,它将无处安放这些信息。

让我们使用Queue寻找一个更好的方法来解决这个问题。

未完待续

关注微信公众号:老齐教室。读深度文章,得精湛技艺,享绚丽人生。

使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

关注微信公众号,读文章、听课程,提升技能