App下載

使用 Python 進行實用的線程編程

花式作死冠軍 2021-09-13 14:13:25 瀏覽數(shù) (2679)
反饋

介紹

Python 不乏并發(fā)選項,標準庫包括對線程、進程和異步 I/O 的支持。在許多情況下,Python 通過創(chuàng)建異步、線程和子進程等高級模塊,消除了使用這些各種并發(fā)方法的困難。在標準庫之外,還有第三種解決方案,例如twisted、stackless 和處理模塊,僅舉幾例。本文使用實踐示例專門關(guān)注 Python 中的線程處理。網(wǎng)上有很多很好的資源來記錄線程 API,但本文試圖提供常見線程使用模式的實踐示例。

首先定義進程和線程之間的區(qū)別很重要。線程與進程的不同之處在于它們共享狀態(tài)、內(nèi)存和資源。這個簡單的區(qū)別對于線程來說既是優(yōu)點也是缺點。一方面,線程是輕量級的并且易于通信,但另一方面,它們帶來了一系列問題,包括死鎖、競爭條件和純粹的復雜性。幸運的是,由于 GIL 和排隊模塊,Python 中的線程實現(xiàn)起來比其他語言要簡單得多。

你好 Python 線程

接下來,我假設你已經(jīng)安裝了 Python 2.5 或更高版本,因為許多示例將使用 Python 語言的更新功能,這些功能至少出現(xiàn)在 Python2.5 中。要開始使用 Python 中的線程,我們將從一個簡單的“Hello World”示例開始:

清單 1. hello_threads_example
        import threading
        import datetime
        
        class ThreadClass(threading.Thread):
          def run(self):
            now = datetime.datetime.now()
            print "%s says Hello World at time: %s" % 
            (self.getName(), now)
        
        for i in range(2):
          t = ThreadClass()
          t.start()

如果你運行這個例子,你會得到以下輸出:

      #python hello_threads.py 
      Thread?1 says Hello World at time: 2008?05?13 13:22:50.252069
      Thread?2 says Hello World at time: 2008?05?13 13:22:50.252576

查看此輸出,你可以看到你收到了來自兩個帶有日期戳的線程的 Hello World 語句。如果你查看實際代碼,會發(fā)現(xiàn)有兩個 import 語句;一個導入 datetime 模塊,另一個導入 threading 模塊。該類ThreadClass繼承自threading.Thread,因此,您需要定義一個 run 方法來執(zhí)行您在線程內(nèi)運行的代碼。在 run 方法中唯一需要注意的重要事項self.getName()是該方法將標識線程的名稱。

最后三行代碼實際上調(diào)用了類并啟動了線程。如果您注意到,t.start()實際上是啟動線程的。線程模塊在設計時就考慮到了繼承性,實際上是建立在較低級別的線程模塊之上的。在大多數(shù)情況下,繼承自 被認為是最佳實踐threading.Thread,因為它為線程編程創(chuàng)建了一個非常自然的 API。

使用帶線程的隊列

正如我之前提到的,當線程需要共享數(shù)據(jù)或資源時,線程處理可能會很復雜。線程模塊確實提供了許多同步原語,包括信號量、條件變量、事件和鎖。雖然存在這些選項,但最好的做法是專注于使用隊列。隊列更容易處理,并使線程編程更加安全,因為它們有效地將所有對資源的訪問集中到單個線程,并允許更清晰、更易讀的設計模式。

在下一個示例中,你將首先創(chuàng)建一個程序,該程序?qū)⒁来位蛞粋€接一個地獲取網(wǎng)站的 URL,并打印出頁面的前 1024 個字節(jié)。這是使用線程可以更快地完成某些事情的經(jīng)典示例。首先,讓我們使用urllib2模塊一次抓取這些頁面,并對代碼進行計時:

清單 2. URL 獲取序列
        import urllib2
        import time
        
        hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"        
        start = time.time()
        #grabs urls of hosts and prints first 1024 bytes of page
        for host in hosts:
          url = urllib2.urlopen(host)
          print url.read(1024)
        
        print "Elapsed Time: %s" % (time.time() ? start)

當你運行它時,你會得到大量輸出到標準輸出,因為頁面被部分打印。但你會在最后得到這個:

        Elapsed Time: 2.40353488922

讓我們稍微看一下這段代碼。你只導入兩個模塊。首先,urllib2模塊是承擔重任并抓取網(wǎng)頁的東西。其次,你通過調(diào)用?time.time()?創(chuàng)建一個開始時間值,然后再次調(diào)用它并減去初始值以確定程序執(zhí)行所需的時間。最后,從程序的速度來看,“兩秒半”的結(jié)果并不可怕,但如果你有數(shù)百個網(wǎng)頁要檢索,考慮到當前的平均值,大約需要 50 秒。看看創(chuàng)建線程版本如何加快速度:

清單 3. URL 獲取線程
          #!/usr/bin/env python
          import Queue
          import threading
          import urllib2
          import time
          
          hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
          "http://ibm.com", "http://apple.com"          
          queue = Queue.Queue()
          
          class ThreadUrl(threading.Thread):
          """Threaded Url Grab"""
            def init(self, queue):
              threading.Thread.init(self)
              self.queue = queue
          
            def run(self):
              while True:
                #grabs host from queue
                host = self.queue.get()
            
                #grabs urls of hosts and prints first 1024 bytes of page
                url = urllib2.urlopen(host)
                print url.read(1024)
            
                #signals to queue job is done
                self.queue.task_done()
          
          start = time.time()
          def main():
          
            #spawn a pool of threads, and pass them queue instance 
            for i in range(5):
              t = ThreadUrl(queue)
              t.setDaemon(True)
              t.start()
              
           #populate queue with data   
              for host in hosts:
                queue.put(host)
           
           #wait on the queue until everything has been processed     
           queue.join()
          
          main()
          print "Elapsed Time: %s" % (time.time() ? start)

這個例子有更多的代碼需要解釋,但由于使用了排隊模塊,它并沒有比第一個線程示例復雜多少。這種模式是在 Python 中使用線程的一種非常常見且推薦的方式。步驟描述如下:

  1. 創(chuàng)建一個?Queue.Queue()?實例,然后用數(shù)據(jù)填充它。
  2. 將填充數(shù)據(jù)的實例傳遞到從?threading.Thread?繼承而創(chuàng)建的?Thread?類中。
  3. 產(chǎn)生一個守護線程池。
  4. 一次從隊列中拉出一項,并在線程內(nèi)部使用該數(shù)據(jù)(即 run 方法)來完成這項工作。
  5. 工作完成后,向?queue.task_done()?隊列發(fā)送任務已完成的信號。
  6. 加入隊列,這實際上意味著等到隊列為空,然后退出主程序。

關(guān)于此模式的注意事項:通過將守護線程設置為 true,它允許主線程或程序在只有守護線程處于活動狀態(tài)時退出。這創(chuàng)建了一種控制程序流程的簡單方法,因為你可以在退出之前加入隊列,或等到隊列為空。確切的過程在隊列模塊的文檔中得到了最好的描述,如右側(cè)的資源部分所示:

join()
阻塞,直到隊列中的所有項目都被獲取和處理。每當將項目添加到隊列時,未完成任務的計數(shù)就會增加。每當使用者線程調(diào)用 task_done() 以指示該項目已被檢索并且其上的所有工作已完成時,未完成任務的計數(shù)就會下降。當未完成任務的數(shù)量降至零時, join()解鎖。

使用多個隊列

因為上面演示的模式非常有效,所以通過將額外的線程池與隊列鏈接來擴展它是相對簡單的。在上面的示例中,你只是打印出網(wǎng)頁的第一部分。下一個示例返回每個線程抓取的整個網(wǎng)頁,然后將其放入另一個隊列。然后設置另一個加入第二個隊列的線程池,然后在網(wǎng)頁上工作。本示例中執(zhí)行的工作涉及使用名為 Beautiful Soup 的第三方 Python 模塊解析網(wǎng)頁。僅使用幾行代碼,使用此模塊,你將提取標題標簽并為你訪問的每個頁面打印出來。

清單 4. 多隊列數(shù)據(jù)挖掘網(wǎng)站
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"
queue = Queue.Queue()
outqueue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def init(self, queue, outqueue):
        threading.Thread.init(self)
        self.queue = queue
        self.outqueue = outqueue

    def run(self):
        while True:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init(self, out_queue):
        threading.Thread.__init(self)
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

start = time.time()
def main():

    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() ? start)

如果你運行此版本的腳本,你將獲得以下輸出:

  #python url_fetch_threaded_part2.py 

  <title>Google</title>  <title>Yahoo!</title>  <title>Apple</title>  <title>IBM United States</title>  <title>Amazon.com: Online Shopping for Electronics, Apparel,
 Computers, Books, DVDs & more</title>  Elapsed Time: 3.75387597084

在查看代碼時,你可以看到我們添加了另一個隊列實例,然后將該隊列傳遞給第一個線程池類ThreadURL. 接下來,你幾乎為下一個線程池類復制了完全相同的結(jié)構(gòu)DatamineThread。在這個類的run方法中,從每個線程的隊列中抓取網(wǎng)頁,chunk,然后用Beautiful Soup處理這個chunk。在這種情況下, 你可以使用 Beautiful Soup 來簡單地從每個頁面中提取標題標簽并打印出來。這個例子可以很容易地變成更有用的東西,因為你擁有基本搜索引擎或數(shù)據(jù)挖掘工具的核心。一個想法是使用 Beautiful Soup 從每個頁面中提取鏈接,然后關(guān)注它們。

總結(jié)

本文探討了 Python 中的線程,并展示了使用隊列來減輕復雜性和細微錯誤以及提高可讀代碼的最佳實踐。雖然這個基本模式相對簡單,但它可以通過將隊列和線程池鏈接在一起來解決大量問題。在最后一部分,您開始探索創(chuàng)建一個更復雜的處理管道,作為未來項目的模型。在資源部分有很多關(guān)于并發(fā)和線程的優(yōu)秀資源。

最后,重要的是要指出線程并不是所有問題的解決方案,而且進程可以非常適合許多情況。如果你只需要分叉多個進程并監(jiān)聽響應,那么標準庫 ??subprocess 模塊尤其可以更簡單地處理。


0 人點贊