第21章-穿线模块¶
python有许多不同的并发构造,如线程、队列和多处理。线程模块曾经是实现并发性的主要方式。几年前,多处理模块被添加到标准库的python套件中。本章将重点介绍如何使用线程和队列。
使用线程¶
我们将从一个简单的示例开始,演示线程如何工作。我们将把 线程 类并使其将名称打印到stdout。让我们开始编码吧!
import random
import time
from threading import Thread
class MyThread(Thread):
"""
A threading example
"""
def __init__(self, name):
"""Initialize the thread"""
Thread.__init__(self)
self.name = name
def run(self):
"""Run the thread"""
amount = random.randint(3, 15)
time.sleep(amount)
msg = "%s is running" % self.name
print(msg)
def create_threads():
"""
Create a group of threads
"""
for i in range(5):
name = "Thread #%s" % (i+1)
my_thread = MyThread(name)
my_thread.start()
if __name__ == "__main__":
create_threads()
在上面的代码中,我们导入python的 随机的 模块 time 模块和我们导入 线程 类从 线程加工 模块。接下来我们子类线程并使其重写 __init__ 方法接受我们标记为“name”的参数。要启动线程,必须调用 开始() 方法。当启动线程时,它将自动调用线程的 run 方法。我们已经重写了线程的run方法,让它选择随机的睡眠时间。这个 random.randint 这里的示例将导致python从3-15中随机选择一个数字。然后我们让线程休眠的秒数,我们只是随机选择来模拟它实际执行的操作。最后,我们打印出线程的名称,让用户知道线程已经完成。
create_threads函数将创建5个线程,每个线程都有一个唯一的名称。如果运行此代码,您应该看到如下内容:
Thread #2 is running
Thread #3 is running
Thread #1 is running
Thread #4 is running
Thread #5 is running
每次输出的顺序都不同。尝试运行代码几次以查看订单更改。现在让我们写一些更实用的东西!
编写线程下载程序¶
前一个例子除了作为一个解释线程如何工作的工具外,并不是很有用。所以在这个例子中,我们将创建一个线程类,可以从Internet下载文件。美国国税局(U.S.InternalRevenueService)有许多PDF格式的表格,供公民用于缴税。我们将使用这个免费资源进行演示。代码如下:
# Python 2 version
import os
import urllib2
from threading import Thread
class DownloadThread(Thread):
"""
A threading example that can download a file
"""
def __init__(self, url, name):
"""Initialize the thread"""
Thread.__init__(self)
self.name = name
self.url = url
def run(self):
"""Run the thread"""
handle = urllib2.urlopen(self.url)
fname = os.path.basename(self.url)
with open(fname, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
msg = "%s has finished downloading %s!" % (self.name,
self.url)
print(msg)
def main(urls):
"""
Run the program
"""
for item, url in enumerate(urls):
name = "Thread %s" % (item+1)
thread = DownloadThread(url, name)
thread.start()
if __name__ == "__main__":
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
main(urls)
这基本上是对第一个脚本的完全重写。在本例中,我们导入OS和URLLIB2模块以及线程模块。我们将使用URLLIB2在线程类中进行实际下载。操作系统模块用于提取正在下载的文件名,以便我们可以使用它在计算机上创建同名文件。在DownloadThread类中,我们设置了 __init__ 接受线程的URL和名称。在run方法中,我们打开URL,提取文件名,然后使用该文件名在磁盘上命名/创建文件。然后我们使用 while 循环一次下载一个千字节的文件并将其写入磁盘。一旦文件完成保存,我们就打印出线程的名称和下载完成的URL。
代码的python 3版本略有不同。您必须导入 urllib 而不是 乌尔利2 使用 urllib.request.urlopen 而不是 urllib2.urlopen .下面是代码,您可以看到不同之处:
# Python 3 version
import os
import urllib.request
from threading import Thread
class DownloadThread(Thread):
"""
A threading example that can download a file
"""
def __init__(self, url, name):
"""Initialize the thread"""
Thread.__init__(self)
self.name = name
self.url = url
def run(self):
"""Run the thread"""
handle = urllib.request.urlopen(self.url)
fname = os.path.basename(self.url)
with open(fname, "wb") as f_handler:
while True:
chunk = handle.read(1024)
if not chunk:
break
f_handler.write(chunk)
msg = "%s has finished downloading %s!" % (self.name,
self.url)
print(msg)
def main(urls):
"""
Run the program
"""
for item, url in enumerate(urls):
name = "Thread %s" % (item+1)
thread = DownloadThread(url, name)
thread.start()
if __name__ == "__main__":
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
main(urls)
使用队列¶
队列可以用于先进先出(FIFO)或后进后出(LILO)类似堆栈的实现,如果您直接使用它们的话。在本节中,我们将混合线程并创建一个简单的文件下载器脚本,以演示队列如何在需要并发的情况下工作。
为了帮助解释队列如何工作,我们将重写上一节中的下载脚本以使用队列。我们开始吧!
import os
import threading
import urllib.request
from queue import Queue
class Downloader(threading.Thread):
"""Threaded File Downloader"""
def __init__(self, queue):
"""Initialize the thread"""
threading.Thread.__init__(self)
self.queue = queue
def run(self):
"""Run the thread"""
while True:
# gets the url from the queue
url = self.queue.get()
# download the file
self.download_file(url)
# send a signal to the queue that the job is done
self.queue.task_done()
def download_file(self, url):
"""Download the file"""
handle = urllib.request.urlopen(url)
fname = os.path.basename(url)
with open(fname, "wb") as f:
while True:
chunk = handle.read(1024)
if not chunk: break
f.write(chunk)
def main(urls):
"""
Run the program
"""
queue = Queue()
# create a thread pool and give them a queue
for i in range(5):
t = Downloader(queue)
t.setDaemon(True)
t.start()
# give the queue some data
for url in urls:
queue.put(url)
# wait for the queue to finish
queue.join()
if __name__ == "__main__":
urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
"http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
main(urls)
让我们把这个分解一下。首先,我们需要查看主函数定义,以了解这一切是如何流动的。这里我们看到它接受一个URL列表。然后,主函数创建一个队列实例,它将传递给5个后台监控线程。守护进程线程和非守护进程线程的主要区别在于,您必须跟踪非守护进程线程并自己关闭它们,而对于守护进程线程,您基本上只需设置它们并忽略它们,当应用程序关闭时,它们也会关闭。接下来,我们使用传入的URL加载队列(使用其put方法)。
最后,我们告诉队列等待线程通过join方法进行处理。在下载类中,我们有一行 self.queue.get()。 直到队列有返回的内容。这意味着这些线只是懒散地坐着等着捡东西。这也意味着对于一个线程来说 get 队列中的某个内容,它必须调用队列的 get 方法。因此,当我们在队列中添加或放置项目时,线程池将 get 项目和处理它们。这也被称为 排液 .处理完队列中的所有项目后,脚本结束并退出。在我的机器上,它在一秒钟内下载所有5个文档。
总结¶
现在您知道了如何在理论上和实际中使用线程和队列。当您创建用户界面并希望保持界面可用时,线程尤其有用。如果没有线程,用户界面将变得没有响应,并且在您下载大型文件或对数据库执行大型查询时,用户界面似乎会挂起。为了防止这种情况发生,您在线程中执行长时间运行的进程,然后在完成后与接口进行通信。