>>> from env_helper import info; info()
页面更新时间: 2023-12-27 09:31:19
运行环境:
    Linux发行版本: Debian GNU/Linux 12 (bookworm)
    操作系统内核: Linux-6.1.0-16-amd64-x86_64-with-glibc2.36
    Python版本: 3.11.2

4.12. 使用Queue使多线程编程更安全

曾经有这么一个说法,程序中存在3种类型的bug:你的bug、我的bug和多线程。 这虽然是句调侃,但从某种程度上道出了一个事实:多线程编程不是件容易的亊情。 线程间的 同步和互斥,线程间数据的共享等这些都是涉及线程安全要考虑的问题。 纵然Python中提供 了众多的同步和互斥机制,如mutex、condition、event等.但同步和互斥本身就不是一个容 易的话题, 稍有不慎就会陷人死锁状态或者威胁线程安全。

我们来看一个经典的多线程同步问题:生产者消费者模型。 如果用Python来实现,你会怎么写?大概思路是这样的:分别创建消费者和生产者线程,生产者往队列里曲放产品,消费者从队列里面取出产品,创建一个 线程锁以保证线程间操作的互斥性。

当队列满的时候消费者进人等待状态,当队列空的时候 生产者进人等待状态。我们来看一个具体的Python实现:

>>> import queue
>>> import threading
>>> import random
>>>
>>> writelock = threading.Lock()      #创建锁对象用于控制输出
>>>
>>> class Producer(threading.Thread):
>>>     def __init__(self,q,con,name):
>>>         super(Producer, self).__init__()
>>>         self.q = q
>>>         self.name = name
>>>         self.con = con
>>>         print ("Producer"+self.name+"Started")
>>>     def tun (self):
>>>         while 1:
>>>             global writelock
>>>             self.con.acquire ()  # 获取锁对象
>>>             if self.q.full():  # 队列满
>>>                 with writelock: # 输出信息
>>>                     print('Queue is full,producer wait!')
>>>                 self.con.wait()   # 等待资源
>>>             else:
>>>                 value = random.randint(0,10)
>>>                 with writelock:
>>>                     print (self.name +" put value "+ self.name+":"+ str(value)+"into queue")
>>>                 self.q.put((self.name+" : "+str(value))) # 放入队列中
>>>                 self.con.notify()                         # 通知消费者
>>>             self.con. release()                      # 释放锁对象
>>> class Consumer(threading.Thread):
>>>     def __init__(self, q,con,name):
>>>         super(Consumer, self).__init__()
>>>         self.q = q
>>>         self.con = con
>>>         self.name = name
>>>         print ("Consumer " + self.name +"started \n ")
>>>     def run(self):
>>>         while 1:
>>>             global writelock
>>>             self.con.acquire()
>>>             if self.q.empty():
>>>                 with writelock:
>>>                     print('queue is empty,consumer wait!')
>>>                 self.con.wait()
>>>             else:
>>>                 value = self.q.get()
>>>                 with writelock:
>>>                     print (self.name +"get value"+value+"from queue")
>>>                 self.con.notify()
>>>             self.con.release()
>>> if __name__ == "__main__":
>>>      q = queue.Queue(10)
>>>      con = threading.Condition()   # 条件变量锁
>>>      p = Producer(q,con,"P1")
>>>      p.start()
>>>      p1 = Producer(q,con,"P2")
>>>      p1.start()
>>>      c1 = Consumer(q,con,"C1")
>>>      c1.start()
ProducerP1Started
ProducerP2Started
Consumer C1started

queue is empty,consumer wait!

上面的程序实现有什么问题吗?回答这个问题之前。我们先来了解一下Queue模块的基 本知识。Python中的Queue模块提供了 3种队列:

  • Queue.Queue(maxsize):先进先出,maxsize为队列大小,其值为非正数的时候为 无限循环队列。

  • Queue.LifoQueue(maxsize):后进先出,相当于栈。

  • Queue.PriorityQueue(maxsize):优先级队列。

这3种队列支持以下方法:

  • Queue.qsize():返回近似的队列大小。注意,这里之所以加“近似”二字,是因为 当该值>0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。

  • Queue.empty():列队为空的时候返回True,否则返回False。

  • Queue.full():当设定了队列大小的情况下.如果队列满则返回True,否则返回False。

  • Queue.put(item[, block[,timeout]]):往队列中添加元素 item, block 设置为 False 的 时候。如果队列满则抛出Full异常。如果block设置为True, timeout为None的时候 则会一直等待直到有空位置,否则会根据timeout的设定超时后抛出Full异常。

  • Queue.put_nowait(item):等价于 put(item,False).block 设置为 False 的时候,如果队 列空则抛出Empty异常。如果block设置为True、timeout为None的时候则会一直等 待直到有元素可用,否则会根据timeout的设定起时后抛出Empty异常。

  • Queue.get([block[,timeout]]):从队列中删除元索并返回该元素的值。

  • Queue.get_nowait():等价于 get(False)。

  • Queue.task_done():发送信号表明入列任务已经完成,经常在消费者线程中用到。

  • Queue.join():阻塞直至队列中所有的元素处理完毕。

Queue模块实现了多个生产者多个消费者的队列,当多线程之间需要信息安全的交换的 时候特别有用,因此这个模块实现了所需要的锁原语,为Python多线程编程提供了有力的支 持,它是线程安全的。需要注意的是Queue模块中的列队利collections.deque所表示的队列 并不一样,前者主要用于不同线程之间的通信,它内部实现了线程的锁机制;而后者主要是 数据结构上的概念,因此支持in方法。 再回过头来看看前面的例子,程序的实现冇什么问题呢?答案很明显,作用于queue操 作的条件变量完全是不需要的,因为queue本身能够保证线程安全,因此不需要额外的同步 机制。那么,该如何修改呢?请读者自行思考。下面的多线程下载的例子也许有助于你完成 上面程序的修改。

>>> import os
>>> import queue
>>> import threading
>>> import urllib.request
>>>
>>> class DownloadThread(threading.Thread):
>>>     def __init__(self, queue):
>>>         threading.Thread.__init__(self)
>>>         self.queue = queue
>>>     def run(self):
>>>         while True:
>>>             url = self.queue.get()           #   从队列中取出一个url元素
>>>             print(self.name+"begin download"+ url +"...")
>>>             self.download_file(url) # 进行文件下载
>>>             self.queue.task_done()  # 下载完毕发送信号
>>>             print(self.name + "download comleted!!!")
>>>     def download_file(self,url):
>>>         urlhandler = urllib.request.urlopen(url)
>>>         fname = os .path .basename (url)+".html " #文件名称
>>>         with open (fname,"wb") as f:    # 打开文件
>>>             while True:
>>>                 chunk = urlhandler.read(1024)
>>>                 if not chunk: break
>>>                 f.write(chunk)
>>> if __name__ == "__main__":
>>>     urls = [ "http://wiki.python.org/moin/WebProgramming",
>>>             "https://www.createspace.com/3611970",
>>>             "http ://wiki.python.org/moin/Documentation"
>>> ]
>>>     queue = queue.Queue()
>>>     # create a thread pool and give them a queue
>>>     for i in range(5):
>>>         t=DownloadThread(queue) #启动5个技程同时进行下4E
>>>         t.setDaemon(True)
>>>         t.start()
>>>     # giva the queue some data
>>>     for url in urls:
>>>         queue.put(url)
>>>     # wait for the queue to finish
>>>     queue.join()
/tmp/ipykernel_20318/3794936594.py:34: DeprecationWarning: setDaemon() is deprecated, set the daemon attribute instead
  t.setDaemon(True)
Exception in thread Thread-10:
Traceback (most recent call last):
  File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/tmp/ipykernel_20318/3794936594.py", line 14, in run
  File "/tmp/ipykernel_20318/3794936594.py", line 18, in download_file
  File "/usr/lib/python3.11/urllib/request.py", line 216, in urlopen
    return opener.open(url, data, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/urllib/request.py", line 519, in open
    response = self._open(req, data)
               ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/urllib/request.py", line 541, in _open
    return self._call_chain(self.handle_open, 'unknown',
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/urllib/request.py", line 496, in _call_chain
    result = func(*args)
             ^^^^^^^^^^^
  File "/usr/lib/python3.11/urllib/request.py", line 1419, in unknown_open
    raise URLError('unknown url type: %s' % type)
urllib.error.URLError: <urlopen error unknown url type: http >
Thread-8begin downloadhttp://wiki.python.org/moin/WebProgramming...
Thread-9begin downloadhttps://www.createspace.com/3611970...
Thread-10begin downloadhttp ://wiki.python.org/moin/Documentation...
Thread-8download comleted!!!