>>> from env_helper import info; info()
页面更新时间: 2023-12-27 09:31:19
运行环境:
Linux发行版本: Debian GNU/Linux 12 (bookworm)
操作系统内核: Linux-6.1.0-16-amd64-x86_64-with-glibc2.36
Python版本: 3.11.2
4.12. 使用Queue使多线程编程更安全¶
曾经有这么一个说法,程序中存在3种类型的bug:你的bug、我的bug和多线程。 这虽然是句调侃,但从某种程度上道出了一个事实:多线程编程不是件容易的亊情。 线程间的 同步和互斥,线程间数据的共享等这些都是涉及线程安全要考虑的问题。 纵然Python中提供 了众多的同步和互斥机制,如mutex、condition、event等.但同步和互斥本身就不是一个容 易的话题, 稍有不慎就会陷人死锁状态或者威胁线程安全。
我们来看一个经典的多线程同步问题:生产者消费者模型。 如果用Python来实现,你会怎么写?大概思路是这样的:分别创建消费者和生产者线程,生产者往队列里曲放产品,消费者从队列里面取出产品,创建一个 线程锁以保证线程间操作的互斥性。
当队列满的时候消费者进人等待状态,当队列空的时候 生产者进人等待状态。我们来看一个具体的Python实现:
>>> import queue
>>> import threading
>>> import random
>>>
>>> writelock = threading.Lock() #创建锁对象用于控制输出
>>>
>>> class Producer(threading.Thread):
>>> def __init__(self,q,con,name):
>>> super(Producer, self).__init__()
>>> self.q = q
>>> self.name = name
>>> self.con = con
>>> print ("Producer"+self.name+"Started")
>>> def tun (self):
>>> while 1:
>>> global writelock
>>> self.con.acquire () # 获取锁对象
>>> if self.q.full(): # 队列满
>>> with writelock: # 输出信息
>>> print('Queue is full,producer wait!')
>>> self.con.wait() # 等待资源
>>> else:
>>> value = random.randint(0,10)
>>> with writelock:
>>> print (self.name +" put value "+ self.name+":"+ str(value)+"into queue")
>>> self.q.put((self.name+" : "+str(value))) # 放入队列中
>>> self.con.notify() # 通知消费者
>>> self.con. release() # 释放锁对象
>>> class Consumer(threading.Thread):
>>> def __init__(self, q,con,name):
>>> super(Consumer, self).__init__()
>>> self.q = q
>>> self.con = con
>>> self.name = name
>>> print ("Consumer " + self.name +"started \n ")
>>> def run(self):
>>> while 1:
>>> global writelock
>>> self.con.acquire()
>>> if self.q.empty():
>>> with writelock:
>>> print('queue is empty,consumer wait!')
>>> self.con.wait()
>>> else:
>>> value = self.q.get()
>>> with writelock:
>>> print (self.name +"get value"+value+"from queue")
>>> self.con.notify()
>>> self.con.release()
>>> if __name__ == "__main__":
>>> q = queue.Queue(10)
>>> con = threading.Condition() # 条件变量锁
>>> p = Producer(q,con,"P1")
>>> p.start()
>>> p1 = Producer(q,con,"P2")
>>> p1.start()
>>> c1 = Consumer(q,con,"C1")
>>> c1.start()
ProducerP1Started
ProducerP2Started
Consumer C1started
queue is empty,consumer wait!
上面的程序实现有什么问题吗?回答这个问题之前。我们先来了解一下Queue模块的基 本知识。Python中的Queue模块提供了 3种队列:
Queue.Queue(maxsize):先进先出,maxsize为队列大小,其值为非正数的时候为 无限循环队列。
Queue.LifoQueue(maxsize):后进先出,相当于栈。
Queue.PriorityQueue(maxsize):优先级队列。
这3种队列支持以下方法:
Queue.qsize():返回近似的队列大小。注意,这里之所以加“近似”二字,是因为 当该值>0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。
Queue.empty():列队为空的时候返回True,否则返回False。
Queue.full():当设定了队列大小的情况下.如果队列满则返回True,否则返回False。
Queue.put(item[, block[,timeout]]):往队列中添加元素 item, block 设置为 False 的 时候。如果队列满则抛出Full异常。如果block设置为True, timeout为None的时候 则会一直等待直到有空位置,否则会根据timeout的设定超时后抛出Full异常。
Queue.put_nowait(item):等价于 put(item,False).block 设置为 False 的时候,如果队 列空则抛出Empty异常。如果block设置为True、timeout为None的时候则会一直等 待直到有元素可用,否则会根据timeout的设定起时后抛出Empty异常。
Queue.get([block[,timeout]]):从队列中删除元索并返回该元素的值。
Queue.get_nowait():等价于 get(False)。
Queue.task_done():发送信号表明入列任务已经完成,经常在消费者线程中用到。
Queue.join():阻塞直至队列中所有的元素处理完毕。
Queue模块实现了多个生产者多个消费者的队列,当多线程之间需要信息安全的交换的 时候特别有用,因此这个模块实现了所需要的锁原语,为Python多线程编程提供了有力的支 持,它是线程安全的。需要注意的是Queue模块中的列队利collections.deque所表示的队列 并不一样,前者主要用于不同线程之间的通信,它内部实现了线程的锁机制;而后者主要是 数据结构上的概念,因此支持in方法。 再回过头来看看前面的例子,程序的实现冇什么问题呢?答案很明显,作用于queue操 作的条件变量完全是不需要的,因为queue本身能够保证线程安全,因此不需要额外的同步 机制。那么,该如何修改呢?请读者自行思考。下面的多线程下载的例子也许有助于你完成 上面程序的修改。
>>> import os
>>> import queue
>>> import threading
>>> import urllib.request
>>>
>>> class DownloadThread(threading.Thread):
>>> def __init__(self, queue):
>>> threading.Thread.__init__(self)
>>> self.queue = queue
>>> def run(self):
>>> while True:
>>> url = self.queue.get() # 从队列中取出一个url元素
>>> print(self.name+"begin download"+ url +"...")
>>> self.download_file(url) # 进行文件下载
>>> self.queue.task_done() # 下载完毕发送信号
>>> print(self.name + "download comleted!!!")
>>> def download_file(self,url):
>>> urlhandler = urllib.request.urlopen(url)
>>> fname = os .path .basename (url)+".html " #文件名称
>>> with open (fname,"wb") as f: # 打开文件
>>> while True:
>>> chunk = urlhandler.read(1024)
>>> if not chunk: break
>>> f.write(chunk)
>>> if __name__ == "__main__":
>>> urls = [ "http://wiki.python.org/moin/WebProgramming",
>>> "https://www.createspace.com/3611970",
>>> "http ://wiki.python.org/moin/Documentation"
>>> ]
>>> queue = queue.Queue()
>>> # create a thread pool and give them a queue
>>> for i in range(5):
>>> t=DownloadThread(queue) #启动5个技程同时进行下4E
>>> t.setDaemon(True)
>>> t.start()
>>> # giva the queue some data
>>> for url in urls:
>>> queue.put(url)
>>> # wait for the queue to finish
>>> queue.join()
/tmp/ipykernel_20318/3794936594.py:34: DeprecationWarning: setDaemon() is deprecated, set the daemon attribute instead
t.setDaemon(True)
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
self.run()
File "/tmp/ipykernel_20318/3794936594.py", line 14, in run
File "/tmp/ipykernel_20318/3794936594.py", line 18, in download_file
File "/usr/lib/python3.11/urllib/request.py", line 216, in urlopen
return opener.open(url, data, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/urllib/request.py", line 519, in open
response = self._open(req, data)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/urllib/request.py", line 541, in _open
return self._call_chain(self.handle_open, 'unknown',
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/urllib/request.py", line 496, in _call_chain
result = func(*args)
^^^^^^^^^^^
File "/usr/lib/python3.11/urllib/request.py", line 1419, in unknown_open
raise URLError('unknown url type: %s' % type)
urllib.error.URLError: <urlopen error unknown url type: http >
Thread-8begin downloadhttp://wiki.python.org/moin/WebProgramming...
Thread-9begin downloadhttps://www.createspace.com/3611970...
Thread-10begin downloadhttp ://wiki.python.org/moin/Documentation...
Thread-8download comleted!!!