Go 語言 分布式搜索引擎

2023-03-22 15:04 更新

原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch6-cloud/ch6-04-search-engine.html


6.4 分布式搜索引擎

在 Web 一章中,我們提到 MySQL 很脆弱。數(shù)據(jù)庫系統(tǒng)本身要保證實時和強一致性,所以其功能設(shè)計上都是為了滿足這種一致性需求。比如 write ahead log 的設(shè)計,基于 B + 樹實現(xiàn)的索引和數(shù)據(jù)組織,以及基于 MVCC 實現(xiàn)的事務(wù)等等。

關(guān)系型數(shù)據(jù)庫一般被用于實現(xiàn) OLTP 系統(tǒng),所謂 OLTP,援引 wikipedia:

在線交易處理(OLTP, Online transaction processing)是指透過信息系統(tǒng)、電腦網(wǎng)絡(luò)及數(shù)據(jù)庫,以線上交易的方式處理一般即時性的作業(yè)數(shù)據(jù),和更早期傳統(tǒng)數(shù)據(jù)庫系統(tǒng)大量批量的作業(yè)方式并不相同。OLTP 通常被運用于自動化的數(shù)據(jù)處理工作,如訂單輸入、金融業(yè)務(wù)… 等反復(fù)性的日常性交易活動。和其相對的是屬于決策分析層次的聯(lián)機分析處理(OLAP)。

在互聯(lián)網(wǎng)的業(yè)務(wù)場景中,也有一些實時性要求不高 (可以接受多秒的延遲),但是查詢復(fù)雜性卻很高的場景。舉個例子,在電商的 WMS 系統(tǒng)中,或者在大多數(shù)業(yè)務(wù)場景豐富的 CRM 或者客服系統(tǒng)中,可能需要提供幾十個字段的隨意組合查詢功能。這種系統(tǒng)的數(shù)據(jù)維度天生眾多,比如一個電商的 WMS 中對一件貨物的描述,可能有下面這些字段:

倉庫 id,入庫時間,庫位分區(qū) id,儲存貨架 id,入庫操作員 id,出庫操作員 id,庫存數(shù)量,過期時間,SKU 類型,產(chǎn)品品牌,產(chǎn)品分類,內(nèi)件數(shù)量

除了上述信息,如果商品在倉庫內(nèi)有流轉(zhuǎn)??赡苓€有有關(guān)聯(lián)的流程 id,當(dāng)前的流轉(zhuǎn)狀態(tài)等等。

想像一下,如果我們所經(jīng)營的是一個大型電商,每天有千萬級別的訂單,那么在這個數(shù)據(jù)庫中查詢和建立合適的索引都是一件非常難的事情。

在 CRM 或客服類系統(tǒng)中,常常有根據(jù)關(guān)鍵字進行搜索的需求,大型互聯(lián)網(wǎng)公司每天會接收數(shù)以萬計的用戶投訴。而考慮到事件溯源,用戶的投訴至少要存 2~3 年。又是千萬級甚至上億的數(shù)據(jù)。根據(jù)關(guān)鍵字進行一次 like 查詢,可能整個 MySQL 就直接掛掉了。

這時候我們就需要搜索引擎來救場了。

搜索引擎

Elasticsearch 是開源分布式搜索引擎的霸主,其依賴于 Lucene 實現(xiàn),在部署和運維方面做了很多優(yōu)化。當(dāng)今搭建一個分布式搜索引擎比起 Sphinx 的時代已經(jīng)是容易很多很多了。只要簡單配置客戶端 IP 和端口就可以了。

倒排列表

雖然 es 是針對搜索場景來定制的,但如前文所言,實際應(yīng)用中常常用 es 來作為 database 來使用,就是因為倒排列表的特性??梢杂帽容^樸素的觀點來理解倒排索引:


圖 6-10 倒排列表

對 Elasticsearch 中的數(shù)據(jù)進行查詢時,本質(zhì)就是求多個排好序的序列求交集。非數(shù)值類型字段涉及到分詞問題,大多數(shù)內(nèi)部使用場景下,我們可以直接使用默認的 bi-gram 分詞。什么是 bi-gram 分詞呢:

即將所有 Ti 和 T(i+1) 組成一個詞(在 Elasticsearch 中叫 term),然后再編排其倒排列表,這樣我們的倒排列表大概就是這樣的:


圖 6-11 “今天天氣很好” 的分詞結(jié)果

當(dāng)用戶搜索'天氣很好'時,其實就是求:天氣、氣很、很好三組倒排列表的交集,但這里的相等判斷邏輯有些特殊,用偽代碼表示一下:

func equal() {
    if postEntry.docID of '天氣' == postEntry.docID of '氣很' &&
        postEntry.offset + 1 of '天氣' == postEntry.offset of '氣很' {
            return true
    }

    if postEntry.docID of '氣很' == postEntry.docID of '很好' &&
        postEntry.offset + 1 of '氣很' == postEntry.offset of '很好' {
        return true
    }

    if postEntry.docID of '天氣' == postEntry.docID of '很好' &&
        postEntry.offset + 2 of '天氣' == postEntry.offset of '很好' {
        return true
    }

    return false
}

多個有序列表求交集的時間復(fù)雜度是:O(N*M),N 為給定列表當(dāng)中元素數(shù)最小的集合,M 為給定列表的個數(shù)。

在整個算法中起決定作用的一是最短的倒排列表的長度,其次是詞數(shù)總和,一般詞數(shù)不會很大(想像一下,你會在搜索引擎里輸入幾百字來搜索么?),所以起決定性作用的,一般是所有倒排列表中,最短的那一個的長度。

因此,文檔總數(shù)很多的情況下,搜索詞的倒排列表最短的那一個不長時,搜索速度也是很快的。如果用關(guān)系型數(shù)據(jù)庫,那就需要按照索引(如果有的話)來慢慢掃描了。

查詢 DSL

es 定義了一套查詢 DSL,當(dāng)我們把 es 當(dāng)數(shù)據(jù)庫使用時,需要用到其 bool 查詢。舉個例子:

{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "field_1": {
              "query": "1",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "field_2": {
              "query": "2",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "field_3": {
              "query": "3",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "field_4": {
              "query": "4",
              "type": "phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size": 1
}

看起來比較麻煩,但表達的意思很簡單:

if field_1 == 1 && field_2 == 2 && field_3 == 3 && field_4 == 4 {
    return true
}

用 bool should query 可以表示 or 的邏輯:

{
  "query": {
    "bool": {
      "should": [
        {
          "match": {
            "field_1": {
              "query": "1",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "field_2": {
              "query": "3",
              "type": "phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size": 1
}

這里表示的是類似:

if field_1 == 1 || field_2 == 2 {
    return true
}

這些 Go 代碼里 if 后面跟著的表達式在編程語言中有專有名詞來表達 Boolean Expression

4 > 1
5 == 2
3 <i && x> 10

es 的 Bool Query 方案,就是用 json 來表達了這種程序語言中的 Boolean Expression,為什么可以這么做呢?因為 json 本身是可以表達樹形結(jié)構(gòu)的,我們的程序代碼在被編譯器 parse 之后,也會變成 AST,而 AST 抽象語法樹,顧名思義,就是樹形結(jié)構(gòu)。理論上 json 能夠完備地表達一段程序代碼被 parse 之后的結(jié)果。這里的 Boolean Expression 被編譯器 Parse 之后也會生成差不多的樹形結(jié)構(gòu),而且只是整個編譯器實現(xiàn)的一個很小的子集。

基于 client SDK 做開發(fā)

初始化:

// 選用 elastic 版本時
// 注意與自己使用的 elasticsearch 要對應(yīng)
import (
    elastic "gopkg.in/olivere/elastic.v3"
)

var esClient *elastic.Client

func initElasticsearchClient(host string, port string) {
    var err error
    esClient, err = elastic.NewClient(
        elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)),
        elastic.SetMaxRetries(3),
    )

    if err != nil {
        // log error
    }
}

插入:

func insertDocument(db string, table string, obj map[string]interface{}) {

    id := obj["id"]

    var indexName, typeName string
    // 數(shù)據(jù)庫中的 database/table 概念,可以簡單映射到 es 的 index 和 type
    // 不過需要注意,因為 es 中的 _type 本質(zhì)上只是 document 的一個字段
    // 所以單個 index 內(nèi)容過多會導(dǎo)致性能問題
    // 在新版本中 type 已經(jīng)廢棄
    // 為了讓不同表的數(shù)據(jù)落入不同的 index,這里我們用 table+name 作為 index 的名字
    indexName = fmt.Sprintf("%v_%v", db, table)
    typeName = table

    // 正常情況
    res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do()
    if err != nil {
        // handle error
    } else {
        // insert success
    }
}

獲?。?

func query(indexName string, typeName string) (*elastic.SearchResult, error) {
    // 通過 bool must 和 bool should 添加 bool 查詢條件
    q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1),
    elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m")))

    q = q.Should(
        elastic.NewMatchPhraseQuery("name", "alex"),
        elastic.NewMatchPhraseQuery("name", "xargin"),
    )

    searchService := esClient.Search(indexName).Type(typeName)
    res, err := searchService.Query(q).Do()
    if err != nil {
        // log error
        return nil, err
    }

    return res, nil
}

刪除:

func deleteDocument(
    indexName string, typeName string, obj map[string]interface{},
) {
    id := obj["id"]

    res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do()
    if err != nil {
        // handle error
    } else {
        // delete success
    }
}

因為 Lucene 的性質(zhì),本質(zhì)上搜索引擎內(nèi)的數(shù)據(jù)是不可變的,所以如果要對文檔進行更新,Lucene 內(nèi)部是按照 id 進行完全覆蓋 (本質(zhì)是取同一 id 最新的 segment 中的數(shù)據(jù)) 的操作,所以與插入的情況是一樣的。

使用 es 作為數(shù)據(jù)庫使用時,需要注意,因為 es 有索引合并的操作,所以數(shù)據(jù)插入到 es 中到可以查詢的到需要一段時間(由 es 的 refresh_interval 決定)。所以千萬不要把 es 當(dāng)成強一致的關(guān)系型數(shù)據(jù)庫來使用。

將 sql 轉(zhuǎn)換為 DSL

比如我們有一段 bool 表達式,user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1),寫成 SQL 是如下形式:

select * from xxx where user_id = 1 and (
    product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1
)

寫成 es 的 DSL 是如下形式:

{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "user_id": {
              "query": "1",
              "type": "phrase"
            }
          }
        },
        {
          "match": {
            "product_id": {
              "query": "1",
              "type": "phrase"
            }
          }
        },
        {
          "bool": {
            "should": [
              {
                "match": {
                  "star_num": {
                    "query": "4",
                    "type": "phrase"
                  }
                }
              },
              {
                "match": {
                  "star_num": {
                    "query": "5",
                    "type": "phrase"
                  }
                }
              }
            ]
          }
        },
        {
          "match": {
            "banned": {
              "query": "1",
              "type": "phrase"
            }
          }
        }
      ]
    }
  },
  "from": 0,
  "size": 1
}

es 的 DSL 雖然很好理解,但是手寫起來非常費勁。前面提供了基于 SDK 的方式來寫,但也不足夠靈活。

SQL 的 where 部分就是 boolean expression。我們之前提到過,這種 bool 表達式在被解析之后,和 es 的 DSL 的結(jié)構(gòu)長得差不多,我們能不能直接通過這種 “差不多” 的猜測來直接幫我們把 SQL 轉(zhuǎn)換成 DSL 呢?

當(dāng)然可以,我們把 SQL 的 where 被 Parse 之后的結(jié)構(gòu)和 es 的 DSL 的結(jié)構(gòu)做個對比:


圖 6-12 AST 和 DSL 之間的對應(yīng)關(guān)系

既然結(jié)構(gòu)上完全一致,邏輯上我們就可以相互轉(zhuǎn)換。我們以廣度優(yōu)先對 AST 樹進行遍歷,然后將二元表達式轉(zhuǎn)換成 json 字符串,再拼裝起來就可以了,限于篇幅,本文中就不給出示例了,讀者朋友可以查看:

github.com/cch123/elasticsql

來學(xué)習(xí)具體的實現(xiàn)。

異構(gòu)數(shù)據(jù)同步

在實際應(yīng)用中,我們很少直接向搜索引擎中寫入數(shù)據(jù)。更為常見的方式是,將 MySQL 或其它關(guān)系型數(shù)據(jù)中的數(shù)據(jù)同步到搜索引擎中。而搜索引擎的使用方只能對數(shù)據(jù)進行查詢,無法進行修改和刪除。

常見的同步方案有兩種:

通過時間戳進行增量數(shù)據(jù)同步


圖 6-13 基于時間戳的數(shù)據(jù)同步

這種同步方式與業(yè)務(wù)強綁定,例如 WMS 系統(tǒng)中的出庫單,我們并不需要非常實時,稍微有延遲也可以接受,那么我們可以每分鐘從 MySQL 的出庫單表中,把最近十分鐘創(chuàng)建的所有出庫單取出,批量存入 es 中,取數(shù)據(jù)的操作需要執(zhí)行的邏輯可以表達為下面的 SQL:

select * from wms_orders where update_time >= date_sub(now(), interval 10 minute);

當(dāng)然,考慮到邊界情況,我們可以讓這個時間段的數(shù)據(jù)與前一次的有一些重疊:

select * from wms_orders where update_time >= date_sub(
    now(), interval 11 minute
);

取最近 11 分鐘有變動的數(shù)據(jù)覆蓋更新到 es 中。這種方案的缺點顯而易見,我們必須要求業(yè)務(wù)數(shù)據(jù)嚴格遵守一定的規(guī)范。比如這里的,必須要有 update_time 字段,并且每次創(chuàng)建和更新都要保證該字段有正確的時間值。否則我們的同步邏輯就會丟失數(shù)據(jù)。

通過 binlog 進行數(shù)據(jù)同步


圖 6-13 基于 binlog 的數(shù)據(jù)同步

業(yè)界使用較多的是阿里開源的 Canal,來進行 binlog 解析與同步。canal 會偽裝成 MySQL 的從庫,然后解析好行格式的 binlog,再以更容易解析的格式(例如 json)發(fā)送到消息隊列。

由下游的 Kafka 消費者負責(zé)把上游數(shù)據(jù)表的自增主鍵作為 es 的文檔的 id 進行寫入,這樣可以保證每次接收到 binlog 時,對應(yīng) id 的數(shù)據(jù)都被覆蓋更新為最新。MySQL 的 Row 格式的 binlog 會將每條記錄的所有字段都提供給下游,所以在向異構(gòu)數(shù)據(jù)目標同步數(shù)據(jù)時,不需要考慮數(shù)據(jù)是插入還是更新,只要一律按 id 進行覆蓋即可。

這種模式同樣需要業(yè)務(wù)遵守一條數(shù)據(jù)表規(guī)范,即表中必須有唯一主鍵 id 來保證我們進入 es 的數(shù)據(jù)不會發(fā)生重復(fù)。一旦不遵守該規(guī)范,那么就會在同步時導(dǎo)致數(shù)據(jù)重復(fù)。當(dāng)然,你也可以為每一張需要的表去定制消費者的邏輯,這就不是通用系統(tǒng)討論的范疇了。



以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號