Go 語(yǔ)言 分布式爬蟲

2023-03-22 15:05 更新

原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch6-cloud/ch6-07-crawler.html


6.7 分布式爬蟲

互聯(lián)網(wǎng)時(shí)代的信息爆炸是很多人倍感頭痛的問題,應(yīng)接不暇的新聞、信息、視頻,無(wú)孔不入地侵占著我們的碎片時(shí)間。但另一方面,在我們真正需要數(shù)據(jù)的時(shí)候,卻感覺數(shù)據(jù)并不是那么容易獲取的。比如我們想要分析現(xiàn)在人在討論些什么,關(guān)心些什么。甚至有時(shí)候,可能我們只是暫時(shí)沒有時(shí)間去一一閱覽心儀的小說,但又想能用技術(shù)手段把它們存在自己的資料庫(kù)里。哪怕是幾個(gè)月或一年后再來(lái)回顧。再或者我們想要把互聯(lián)網(wǎng)上這些稍縱即逝的有用信息保存起來(lái),例如某個(gè)非常小的論壇中聚集的同好們的高質(zhì)量討論,在未來(lái)某個(gè)時(shí)刻,即使這些小眾的聚集區(qū)無(wú)以為繼時(shí),依然能讓我們從硬盤中翻出當(dāng)初珍貴的觀點(diǎn)來(lái)。

除去情懷需求,互聯(lián)網(wǎng)上有大量珍貴的開放資料,近年來(lái)深度學(xué)習(xí)如雨后春筍一般火熱起來(lái),但機(jī)器學(xué)習(xí)很多時(shí)候并不是苦于我的模型是否建立得合適,我的參數(shù)是否調(diào)整得正確,而是苦于最初的起步階段:沒有數(shù)據(jù)。

作為收集數(shù)據(jù)的前置工作,有能力去寫一個(gè)簡(jiǎn)單的或者復(fù)雜的爬蟲,對(duì)于我們來(lái)說依然非常重要。

6.7.1 基于 colly 的單機(jī)爬蟲

《Go 語(yǔ)言編程》一書給出了簡(jiǎn)單的爬蟲示例,經(jīng)過了多年的發(fā)展,現(xiàn)在使用 Go 語(yǔ)言寫一個(gè)網(wǎng)站的爬蟲要更加方便,比如用 colly 來(lái)實(shí)現(xiàn)爬取某網(wǎng)站(虛擬站點(diǎn),這里用 abcdefg 作為占位符)在 Go 語(yǔ)言標(biāo)簽下的前十頁(yè)內(nèi)容:

package main

import (
    "fmt"
    "regexp"
    "time"

    "github.com/gocolly/colly"
)

var visited = map[string]bool{}

func main() {
    // Instantiate default collector
    c := colly.NewCollector(
        colly.AllowedDomains("www.abcdefg.com"),
        colly.MaxDepth(1),
    )

    // 我們認(rèn)為匹配該模式的是該網(wǎng)站的詳情頁(yè)
    detailRegex, _ := regexp.Compile(`/go/go\?p=\d+
)
    // 匹配下面模式的是該網(wǎng)站的列表頁(yè)
    listRegex, _ := regexp.Compile(`/t/\d+#\w+`)

    // 所有 a 標(biāo)簽,上設(shè)置回調(diào)函數(shù)
    c.OnHTML("a[href]", func(e *colly.HTMLElement) {
        link := e.Attr("href")

        // 已訪問過的詳情頁(yè)或列表頁(yè),跳過
        if visited[link] && (detailRegex.Match([]byte(link)) || listRegex.Match([]byte(link))) {
            return
        }

        // 既不是列表頁(yè),也不是詳情頁(yè)
        // 那么不是我們關(guān)心的內(nèi)容,要跳過
        if !detailRegex.Match([]byte(link)) && !listRegex.Match([]byte(link)) {
            println("not match", link)
            return
        }

        // 因?yàn)榇蠖鄶?shù)網(wǎng)站有反爬蟲策略
        // 所以爬蟲邏輯中應(yīng)該有 sleep 邏輯以避免被封殺
        time.Sleep(time.Second)
        println("match", link)

        visited[link] = true

        time.Sleep(time.Millisecond * 2)
        c.Visit(e.Request.AbsoluteURL(link))
    })

    err := c.Visit("https://www.abcdefg.com/go/go")
    if err != nil {fmt.Println(err)}
}

6.7.2 分布式爬蟲

想像一下,你們的信息分析系統(tǒng)運(yùn)行非常之快。獲取信息的速度成為了瓶頸,雖然可以用上 Go 語(yǔ)言所有優(yōu)秀的并發(fā)特性,將單機(jī)的 CPU 和網(wǎng)絡(luò)帶寬都用滿,但還是希望能夠加快爬蟲的爬取速度。在很多場(chǎng)景下,速度是有意義的:

  1. 對(duì)于價(jià)格戰(zhàn)期間的電商們來(lái)說,希望能夠在對(duì)手價(jià)格變動(dòng)后第一時(shí)間獲取到其最新價(jià)格,再靠機(jī)器自動(dòng)調(diào)整本家的商品價(jià)格。
  2. 對(duì)于類似頭條之類的 Feed 流業(yè)務(wù),信息的時(shí)效性也非常重要。如果我們慢吞吞地爬到的新聞是昨天的新聞,那對(duì)于用戶來(lái)說就沒有任何意義。

所以我們需要分布式爬蟲。從本質(zhì)上來(lái)講,分布式爬蟲是一套任務(wù)分發(fā)和執(zhí)行系統(tǒng)。而常見的任務(wù)分發(fā),因?yàn)樯舷掠未嬖谒俣炔黄ヅ鋯栴},必然要借助消息隊(duì)列。


圖 6-14 爬蟲工作流程

上游的主要工作是根據(jù)預(yù)先配置好的起點(diǎn)來(lái)爬取所有的目標(biāo) “列表頁(yè)”,列表頁(yè)的 html 內(nèi)容中會(huì)包含有所有詳情頁(yè)的鏈接。詳情頁(yè)的數(shù)量一般是列表頁(yè)的 10 到 100 倍,所以我們將這些詳情頁(yè)鏈接作為“任務(wù)” 內(nèi)容,通過消息隊(duì)列分發(fā)出去。

針對(duì)頁(yè)面爬取來(lái)說,在執(zhí)行時(shí)是否偶爾會(huì)有重復(fù)其實(shí)不太重要,因?yàn)槿蝿?wù)結(jié)果是冪等的(這里我們只爬頁(yè)面內(nèi)容,不考慮評(píng)論部分)。

本節(jié)我們來(lái)簡(jiǎn)單實(shí)現(xiàn)一個(gè)基于消息隊(duì)列的爬蟲,本節(jié)我們使用 nats 來(lái)做任務(wù)分發(fā)。實(shí)際開發(fā)中,應(yīng)該針對(duì)自己的業(yè)務(wù)對(duì)消息本身的可靠性要求和公司的基礎(chǔ)架構(gòu)組件情況進(jìn)行選型。

6.7.2.1 nats 簡(jiǎn)介

nats 是 Go 實(shí)現(xiàn)的一個(gè)高性能分布式消息隊(duì)列,適用于高并發(fā)高吞吐量的消息分發(fā)場(chǎng)景。早期的 nats 以速度為重,沒有支持持久化。從 16 年開始,nats 通過 nats-streaming 支持基于日志的持久化,以及可靠的消息傳輸。為了演示方便,我們本節(jié)中只使用 nats。

nats 的服務(wù)端項(xiàng)目是 gnatsd,客戶端與 gnatsd 的通信方式為基于 tcp 的文本協(xié)議,非常簡(jiǎn)單:

向 subject 為 task 發(fā)消息:


圖 6-15 nats 協(xié)議中的 pub

以 workers 的 queue 從 tasks subject 訂閱消息:


圖 6-16 nats 協(xié)議中的 sub

其中的 queue 參數(shù)是可選的,如果希望在分布式的消費(fèi)端進(jìn)行任務(wù)的負(fù)載均衡,而不是所有人都收到同樣的消息,那么就要給消費(fèi)端指定相同的 queue 名字。

基本消息生產(chǎn)

生產(chǎn)消息只要指定 subject 即可:

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {return}

// 指定 subject 為 tasks,消息內(nèi)容隨意
err = nc.Publish("tasks", []byte("your task content"))

nc.Flush()
基本消息消費(fèi)

直接使用 nats 的 subscribe API 并不能達(dá)到任務(wù)分發(fā)的目的,因?yàn)?pub sub 本身是廣播性質(zhì)的。所有消費(fèi)者都會(huì)收到完全一樣的所有消息。

除了普通的 subscribe 之外,nats 還提供了 queue subscribe 的功能。只要提供一個(gè) queue group 名字(類似 Kafka 中的 consumer group),即可均衡地將任務(wù)分發(fā)給消費(fèi)者。

nc, err := nats.Connect(nats.DefaultURL)
if err != nil {return}

// queue subscribe 相當(dāng)于在消費(fèi)者之間進(jìn)行任務(wù)分發(fā)的分支均衡
// 前提是所有消費(fèi)者都使用 workers 這個(gè) queue
// nats 中的 queue 概念上類似于 Kafka 中的 consumer group
sub, err := nc.QueueSubscribeSync("tasks", "workers")
if err != nil {return}

var msg *nats.Msg
for {
    msg, err = sub.NextMsg(time.Hour * 10000)
    if err != nil {break}
    // 正確地消費(fèi)到了消息
    // 可用 nats.Msg 對(duì)象處理任務(wù)
}

6.7.3 結(jié)合 nats 和 colly 的消息生產(chǎn)

我們?yōu)槊恳粋€(gè)網(wǎng)站定制一個(gè)對(duì)應(yīng)的 collector,并設(shè)置相應(yīng)的規(guī)則,比如 abcdefg,hijklmn(虛構(gòu)的),再用簡(jiǎn)單的工廠方法來(lái)將該 collector 和其 host 對(duì)應(yīng)起來(lái),每個(gè)站點(diǎn)爬到列表頁(yè)之后,需要在當(dāng)前程序中把所有鏈接解析出來(lái),并把落地頁(yè)的 URL 發(fā)往消息隊(duì)列。

package main

import (
    "fmt"
    "net/url"

    "github.com/gocolly/colly"
)

var domain2Collector = map[string]*colly.Collector{}
var nc *nats.Conn
var maxDepth = 10
var natsURL = "nats://localhost:4222"

func factory(urlStr string) *colly.Collector {
    u, _ := url.Parse(urlStr)
    return domain2Collector[u.Host]
}

func initABCDECollector() *colly.Collector {
    c := colly.NewCollector(
        colly.AllowedDomains("www.abcdefg.com"),
        colly.MaxDepth(maxDepth),
    )

    c.OnResponse(func(resp *colly.Response) {
        // 做一些爬完之后的善后工作
        // 比如頁(yè)面已爬完的確認(rèn)存進(jìn) MySQL
    })

    c.OnHTML("a[href]", func(e *colly.HTMLElement) {
        // 基本的反爬蟲策略
        link := e.Attr("href")
        time.Sleep(time.Second * 2)

        // 正則 match 列表頁(yè)的話,就 visit
        if listRegex.Match([]byte(link)) {
            c.Visit(e.Request.AbsoluteURL(link))
        }
        // 正則 match 落地頁(yè)的話,就發(fā)消息隊(duì)列
        if detailRegex.Match([]byte(link)) {
            err = nc.Publish("tasks", []byte(link))
            nc.Flush()
        }
    })
    return c
}

func initHIJKLCollector() *colly.Collector {
    c := colly.NewCollector(
        colly.AllowedDomains("www.hijklmn.com"),
        colly.MaxDepth(maxDepth),
    )

    c.OnHTML("a[href]", func(e *colly.HTMLElement) {
    })

    return c
}

func init() {
    domain2Collector["www.abcdefg.com"] = initABCDECollector()
    domain2Collector["www.hijklmn.com"] = initHIJKLCollector()
    var err error
    nc, err = nats.Connect(natsURL)
    if err != nil {os.Exit(1)}
}

func main() {
    urls := []string{"https://www.abcdefg.com", "https://www.hijklmn.com"}
    for _, url := range urls {
        instance := factory(url)
        instance.Visit(url)
    }
}

6.7.4 結(jié)合 colly 的消息消費(fèi)

消費(fèi)端就簡(jiǎn)單一些了,我們只需要訂閱對(duì)應(yīng)的主題,并直接訪問網(wǎng)站的詳情頁(yè) (落地頁(yè)) 即可。

package main

import (
    "fmt"
    "net/url"

    "github.com/gocolly/colly"
)

var domain2Collector = map[string]*colly.Collector{}
var nc *nats.Conn
var maxDepth = 10
var natsURL = "nats://localhost:4222"

func factory(urlStr string) *colly.Collector {
    u, _ := url.Parse(urlStr)
    return domain2Collector[u.Host]
}

func initV2exCollector() *colly.Collector {
    c := colly.NewCollector(
        colly.AllowedDomains("www.abcdefg.com"),
        colly.MaxDepth(maxDepth),
    )
    return c
}

func initV2fxCollector() *colly.Collector {
    c := colly.NewCollector(
        colly.AllowedDomains("www.hijklmn.com"),
        colly.MaxDepth(maxDepth),
    )
    return c
}

func init() {
    domain2Collector["www.abcdefg.com"] = initV2exCollector()
    domain2Collector["www.hijklmn.com"] = initV2fxCollector()

    var err error
    nc, err = nats.Connect(natsURL)
    if err != nil {os.Exit(1)}
}

func startConsumer() {
    nc, err := nats.Connect(nats.DefaultURL)
    if err != nil {return}

    sub, err := nc.QueueSubscribeSync("tasks", "workers")
    if err != nil {return}

    var msg *nats.Msg
    for {
        msg, err = sub.NextMsg(time.Hour * 10000)
        if err != nil {break}

        urlStr := string(msg.Data)
        ins := factory(urlStr)
        // 因?yàn)樽钕掠文玫降囊欢ㄊ菍?duì)應(yīng)網(wǎng)站的落地頁(yè)
        // 所以不用進(jìn)行多余的判斷了,直接爬內(nèi)容即可
        ins.Visit(urlStr)
        // 防止被封殺
        time.Sleep(time.Second)
    }
}

func main() {
    startConsumer()
}

從代碼層面上來(lái)講,這里的生產(chǎn)者和消費(fèi)者其實(shí)本質(zhì)上差不多。如果日后我們要靈活地支持增加、減少各種網(wǎng)站的爬取的話,應(yīng)該思考如何將這些爬蟲的策略、參數(shù)盡量地配置化。

在本章的分布式配置一節(jié)中已經(jīng)講了一些配置系統(tǒng)的使用,讀者可以自行進(jìn)行嘗試,這里就不再贅述了。



以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)