老齐教室

线程:概念和实现(4)

翻译:老齐

译者注:与本文相关图书推荐:《Python大学实用教程》《跟老齐学Python:轻松入门》


第四部分

将队列应用于PCP

如果你希望一次能够处理管道中的多个值,就需要一种针对管道的数据结构,它相当于producer的备份,能实现数量增加和减少。

Python标准库有一个queue模块,该模块有一个Queue 类,下面将Pipeline改为Queue,就可以不再使用Lock锁定某些变量,此外,还将使用Python的threading模块中的Event来停止工作线程,这是一种与以往不同的方法。

Event开始。当有很多线程等待threading.Event实例的时候,它能够将一个线程标记为一个事件。这段代码的关键是,等待事件的线程不一定需要停止它们正在做的事情,它们可以每隔一段时间检查一次Event的状态。

很多事情都可以触发event。在本例中,主线程将简单地休眠一段时间,然后运行.set()

1
2
3
4
5
6
7
8
9
10
11
12
13
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
# logging.getLogger().setLevel(logging.DEBUG)
pipeline = Pipeline()
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()

这里唯一的变化是创建了event对象,然后将event作为参数传给后面的.submit方法,在with语句中,有一句要sleep一秒钟,再记录日志信息,最后调用event.set()

producer也不需要改变太多:

1
2
3
4
5
6
7
def producer(pipeline, event):
"""Pretend we're getting a number from the network."""
while not event.is_set():
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
pipeline.set_message(message, "Producer")
logging.info("Producer received EXIT event. Exiting")

while循环中不再为pipeline设置SENTINEL值。consumer需要相应做较大改动:

1
2
3
4
5
6
7
8
9
10
11
def consumer(pipeline, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not pipeline.empty():
message = pipeline.get_message("Consumer")
logging.info(
"Consumer storing message: %s (queue size=%s)",
message,
pipeline.qsize(),
)

logging.info("Consumer received EXIT event. Exiting")

必须删除SENTINEL值相关的代码,while循环的条件也因此更复杂了一些,现在需要考虑not event.is_set()not pipeline.empty()两个条件,也就是未设置event,或者pipeline未清空时。

要确保在consumer进程结束是队列中已经是空的了,否则就会出现以下两种糟糕的情况。一是丢失了这些最终消息,但更严重的情况是第二种,producer如果视图将信息添加到完整队列中,会被锁住,从而不能返回。这种事件会发生在producer验证.is_set()条件之后,调用pipeline.set_message()之前。

这种事件会发生在producer验证.is_set()条件之后,调用pipeline.set_message()之前。

如果发生这种情况,producer可能会在队列仍然全满的情况下唤醒并退出。然后,调用.set_message().set_message()将一直等到队列中有新信息的空间。若consumer已经退出,这种情况就不会发生,而且producer不会退出。

consumer中的其他部分看起来应该很熟悉。

然而,Pipeline还需要重写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Pipeline(queue.Queue):
def __init__(self):
super().__init__(maxsize=10)

def get_message(self, name):
logging.debug("%s:about to get from queue", name)
value = self.get()
logging.debug("%s:got %d from queue", name, value)
return value

def set_message(self, value, name):
logging.debug("%s:about to add %d to queue", name, value)
self.put(value)
logging.debug("%s:added %d to queue", name, value)

上面的Pipelinequeue.Queue的子类。Queue 在初始化时指定一个可选参数,以指定队列的最大长度。

如果为maxsize指定一个正数,则该数字为队列元素个数的极限,如果达到该值,.put()方法被锁定,直到元素的数量少于maxsize才解锁。如果不指定maxsize,则队列将增长到计算机内存的所许可的最值。

.get_message().set_message()两个方法代码更少了,它们基本上把.get().put()封装在Queue中。你可能想知道防止线程发生竞态条件的锁都去了哪里。

编写标准库的核心开发人员知道,Queue经常在多线程环境中使用,于是将锁合并到Queue本身中。Queue对于线程来说是安全的。

此程序的运行如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ ./prodcom_queue.py
Producer got message: 32
Producer got message: 51
Producer got message: 25
Producer got message: 94
Producer got message: 29
Consumer storing message: 32 (queue size=3)
Producer got message: 96
Consumer storing message: 51 (queue size=3)
Producer got message: 6
Consumer storing message: 25 (queue size=3)
Producer got message: 31

[many lines deleted]

Producer got message: 80
Consumer storing message: 94 (queue size=6)
Producer got message: 33
Consumer storing message: 20 (queue size=6)
Producer got message: 48
Consumer storing message: 31 (queue size=6)
Producer got message: 52
Consumer storing message: 98 (queue size=6)
Main: about to set event
Producer got message: 13
Consumer storing message: 59 (queue size=6)
Producer received EXIT event. Exiting
Consumer storing message: 75 (queue size=6)
Consumer storing message: 97 (queue size=5)
Consumer storing message: 80 (queue size=4)
Consumer storing message: 33 (queue size=3)
Consumer storing message: 48 (queue size=2)
Consumer storing message: 52 (queue size=1)
Consumer storing message: 13 (queue size=0)
Consumer received EXIT event. Exiting

通读上述示例的输出,会发现,有的地方很有意思。在顶部,你可以看到producer必须创建5条信息并将其中4条放在队列中,队列中最前面的一条被操作系统换掉之后,第5条条信息才能加入队列。

然后consumer运行,把第1条信息拉了出来,它打印出了该信息以及队列在此时的长度:

1
Consumer storing message: 32 (queue size=3)

此时,标明第5条信息还没有进入pipeline ,删除单个信息后queue的减小到3。你也知道queue可以保存10条消息,因此queue线程不会被queue阻塞,它被操作系统置换了。

注意:你调试的输出结果会有所不同。你的输出将随着运行次数的不同而改变。这就是用线程工作的乐趣所在!

执行代码,你能看到主线程生成event事件,这会导致producer立即退出,consumer还有很多工作要做,所以它会一直运行,直到清理完pipeline

尝试操作大小不同的队列,并调用producerconsumer中的time.sleep(),以分别模拟更长的网络或磁盘访问时间。即使对程序的这些内容稍加更改,也会使结果产生很大差异。

这是解决发PCP的一个好方法,但是你可以进一步简化它,不需要使用Pipeline,一旦去掉日志记录,它就会变成一个queue.Queue

下面是直接使用queue.Queue的最终代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import concurrent.futures
import logging
import queue
import random
import threading
import time

def producer(queue, event):
"""Pretend we're getting a number from the network."""
while not event.is_set():
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
queue.put(message)

logging.info("Producer received event. Exiting")

def consumer(queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)

logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")

pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)

time.sleep(0.1)
logging.info("Main: about to set event")
event.set()

这更易于阅读,并展示了如何使用Python的内置模块来简化复杂的问题。

LockQueue是便于解决并发问题的类,但标准库还提供了其他类。在结束本文之前,让我们浏览其中一些类。

Threading

Python的threading模块还提供了一些类,虽然上面的示例不需要这些,但是它们在不同的用例中可以派上用场,所以熟悉它们是有好处的。

Semaphore

threading.Semaphore有一些特殊属性的计数器对象,这里实现的计数具有原子性,意味着可以保证操作系统不会在递增或递减计数器的过程中交换线程。

内部计数器在调用.release()时递增,在调用.acquire()时递减。

另外一个特殊属性,如果一个线程在计数器为零时调用.acquire(),则该线程将被锁定,直到另一个线程调用.release(),并将计数器增加到1。

Semaphores通常用于保护容量有限的资源。例如,如果你有一个连接池,并且希望将该池的大小限制为特定的数目。

Timer

threading.Timer用于在经过一定时间后调度要调用的函数,你可以通过传入等待的秒数和调用的函数来创建Timer实例:

1
t = threading.Timer(30.0, my_function)

通过调用.start()启动Timer。在指定时间之后的某个时间点,将在新线程上调用该函数。但请注意,无法保证会在你希望的时间准确调用该函数。

如果要停止已经启动的Timer,可以调用.cancel()。如果在Timer触发后调用.cancel(),不会执行任何操作,也不会产生异常。

Timer可用于在特定时间后提示用户执行操作。如果用户在Timer过期之前执行操作,则可以调用.cancel()

Barrier

threading.Barrier可用于保持固定数量的线程同步。创建Barrier时,调用方必须指定将要同步的线程数。每个线程都调用Barrier.wait()方法,它们都将保持封锁状态,直到指定数量的线程在等待,然后全部同时释放。

请记住:线程是由操作系统调度的,因此,即使所有线程都是同时释放的,它们也将被调度为一次运行一个线程。

Barrier的一个用途是允许线程池对自身进行初始化。让这些线程初始化后在Barrier上等待,将确保在所有线程完成初始化之前,没有一个线程开始运行。

结论:Python中的线程

现在你已经了解了Python的threading提供的许多功能,以及一些如何写线程程序和用线程程序解决问题的示例。你还看到了在编写和调试线程程序时出现的一些问题。

原文链接:https://realpython.com/intro-to-python-threading/

关注微信公众号:老齐教室。读深度文章,得精湛技艺,享绚丽人生。

使用支付宝打赏
使用微信打赏

若你觉得我的文章对你有帮助,欢迎点击上方按钮对我打赏

关注微信公众号,读文章、听课程,提升技能