第21章-穿线模块

python有许多不同的并发构造,如线程、队列和多处理。线程模块曾经是实现并发性的主要方式。几年前,多处理模块被添加到标准库的python套件中。本章将重点介绍如何使用线程和队列。

使用线程

我们将从一个简单的示例开始,演示线程如何工作。我们将把 线程 类并使其将名称打印到stdout。让我们开始编码吧!

import random
import time

from threading import Thread

class MyThread(Thread):
    """
    A threading example
    """

    def __init__(self, name):
        """Initialize the thread"""
        Thread.__init__(self)
        self.name = name

    def run(self):
        """Run the thread"""
        amount = random.randint(3, 15)
        time.sleep(amount)
        msg = "%s is running" % self.name
        print(msg)

def create_threads():
    """
    Create a group of threads
    """
    for i in range(5):
        name = "Thread #%s" % (i+1)
        my_thread = MyThread(name)
        my_thread.start()

if __name__ == "__main__":
    create_threads()

在上面的代码中,我们导入python的 随机的 模块 time 模块和我们导入 线程 类从 线程加工 模块。接下来我们子类线程并使其重写 __init__ 方法接受我们标记为“name”的参数。要启动线程,必须调用 开始() 方法。当启动线程时,它将自动调用线程的 run 方法。我们已经重写了线程的run方法,让它选择随机的睡眠时间。这个 random.randint 这里的示例将导致python从3-15中随机选择一个数字。然后我们让线程休眠的秒数,我们只是随机选择来模拟它实际执行的操作。最后,我们打印出线程的名称,让用户知道线程已经完成。

create_threads函数将创建5个线程,每个线程都有一个唯一的名称。如果运行此代码,您应该看到如下内容:

Thread #2 is running
Thread #3 is running
Thread #1 is running
Thread #4 is running
Thread #5 is running

每次输出的顺序都不同。尝试运行代码几次以查看订单更改。现在让我们写一些更实用的东西!

编写线程下载程序

前一个例子除了作为一个解释线程如何工作的工具外,并不是很有用。所以在这个例子中,我们将创建一个线程类,可以从Internet下载文件。美国国税局(U.S.InternalRevenueService)有许多PDF格式的表格,供公民用于缴税。我们将使用这个免费资源进行演示。代码如下:

# Python 2 version

import os
import urllib2

from threading import Thread

class DownloadThread(Thread):
    """
    A threading example that can download a file
    """

    def __init__(self, url, name):
        """Initialize the thread"""
        Thread.__init__(self)
        self.name = name
        self.url = url

    def run(self):
        """Run the thread"""
        handle = urllib2.urlopen(self.url)
        fname = os.path.basename(self.url)
        with open(fname, "wb") as f_handler:
            while True:
                chunk = handle.read(1024)
                if not chunk:
                    break
                f_handler.write(chunk)
        msg = "%s has finished downloading %s!" % (self.name,
                                                   self.url)
        print(msg)

def main(urls):
    """
    Run the program
    """
    for item, url in enumerate(urls):
        name = "Thread %s" % (item+1)
        thread = DownloadThread(url, name)
        thread.start()

if __name__ == "__main__":
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    main(urls)

这基本上是对第一个脚本的完全重写。在本例中,我们导入OS和URLLIB2模块以及线程模块。我们将使用URLLIB2在线程类中进行实际下载。操作系统模块用于提取正在下载的文件名,以便我们可以使用它在计算机上创建同名文件。在DownloadThread类中,我们设置了 __init__ 接受线程的URL和名称。在run方法中,我们打开URL,提取文件名,然后使用该文件名在磁盘上命名/创建文件。然后我们使用 while 循环一次下载一个千字节的文件并将其写入磁盘。一旦文件完成保存,我们就打印出线程的名称和下载完成的URL。

代码的python 3版本略有不同。您必须导入 urllib 而不是 乌尔利2 使用 urllib.request.urlopen 而不是 urllib2.urlopen .下面是代码,您可以看到不同之处:

# Python 3 version

import os
import urllib.request

from threading import Thread

class DownloadThread(Thread):
    """
    A threading example that can download a file
    """

    def __init__(self, url, name):
        """Initialize the thread"""
        Thread.__init__(self)
        self.name = name
        self.url = url

    def run(self):
        """Run the thread"""
        handle = urllib.request.urlopen(self.url)
        fname = os.path.basename(self.url)
        with open(fname, "wb") as f_handler:
            while True:
                chunk = handle.read(1024)
                if not chunk:
                    break
                f_handler.write(chunk)
        msg = "%s has finished downloading %s!" % (self.name,
                                                   self.url)
        print(msg)

def main(urls):
    """
    Run the program
    """
    for item, url in enumerate(urls):
        name = "Thread %s" % (item+1)
        thread = DownloadThread(url, name)
        thread.start()

if __name__ == "__main__":
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    main(urls)

使用队列

队列可以用于先进先出(FIFO)或后进后出(LILO)类似堆栈的实现,如果您直接使用它们的话。在本节中,我们将混合线程并创建一个简单的文件下载器脚本,以演示队列如何在需要并发的情况下工作。

为了帮助解释队列如何工作,我们将重写上一节中的下载脚本以使用队列。我们开始吧!

import os
import threading
import urllib.request

from queue import Queue

class Downloader(threading.Thread):
    """Threaded File Downloader"""

    def __init__(self, queue):
        """Initialize the thread"""
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        """Run the thread"""
        while True:
            # gets the url from the queue
            url = self.queue.get()

            # download the file
            self.download_file(url)

            # send a signal to the queue that the job is done
            self.queue.task_done()

    def download_file(self, url):
        """Download the file"""
        handle = urllib.request.urlopen(url)
        fname = os.path.basename(url)
        with open(fname, "wb") as f:
            while True:
                chunk = handle.read(1024)
                if not chunk: break
                f.write(chunk)

def main(urls):
    """
    Run the program
    """
    queue = Queue()

    # create a thread pool and give them a queue
    for i in range(5):
        t = Downloader(queue)
        t.setDaemon(True)
        t.start()

    # give the queue some data
    for url in urls:
        queue.put(url)

    # wait for the queue to finish
    queue.join()

if __name__ == "__main__":
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",
            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    main(urls)

让我们把这个分解一下。首先,我们需要查看主函数定义,以了解这一切是如何流动的。这里我们看到它接受一个URL列表。然后,主函数创建一个队列实例,它将传递给5个后台监控线程。守护进程线程和非守护进程线程的主要区别在于,您必须跟踪非守护进程线程并自己关闭它们,而对于守护进程线程,您基本上只需设置它们并忽略它们,当应用程序关闭时,它们也会关闭。接下来,我们使用传入的URL加载队列(使用其put方法)。

最后,我们告诉队列等待线程通过join方法进行处理。在下载类中,我们有一行 self.queue.get()。 直到队列有返回的内容。这意味着这些线只是懒散地坐着等着捡东西。这也意味着对于一个线程来说 get 队列中的某个内容,它必须调用队列的 get 方法。因此,当我们在队列中添加或放置项目时,线程池将 get 项目和处理它们。这也被称为 排液 .处理完队列中的所有项目后,脚本结束并退出。在我的机器上,它在一秒钟内下载所有5个文档。

总结

现在您知道了如何在理论上和实际中使用线程和队列。当您创建用户界面并希望保持界面可用时,线程尤其有用。如果没有线程,用户界面将变得没有响应,并且在您下载大型文件或对数据库执行大型查询时,用户界面似乎会挂起。为了防止这种情况发生,您在线程中执行长时间运行的进程,然后在完成后与接口进行通信。