原文鏈接:https://chai2010.cn/advanced-go-programming-book/ch6-cloud/ch6-04-search-engine.html
在 Web 一章中,我們提到 MySQL 很脆弱。數(shù)據(jù)庫系統(tǒng)本身要保證實時和強一致性,所以其功能設(shè)計上都是為了滿足這種一致性需求。比如 write ahead log 的設(shè)計,基于 B + 樹實現(xiàn)的索引和數(shù)據(jù)組織,以及基于 MVCC 實現(xiàn)的事務(wù)等等。
關(guān)系型數(shù)據(jù)庫一般被用于實現(xiàn) OLTP 系統(tǒng),所謂 OLTP,援引 wikipedia:
在線交易處理(OLTP, Online transaction processing)是指透過信息系統(tǒng)、電腦網(wǎng)絡(luò)及數(shù)據(jù)庫,以線上交易的方式處理一般即時性的作業(yè)數(shù)據(jù),和更早期傳統(tǒng)數(shù)據(jù)庫系統(tǒng)大量批量的作業(yè)方式并不相同。OLTP 通常被運用于自動化的數(shù)據(jù)處理工作,如訂單輸入、金融業(yè)務(wù)… 等反復(fù)性的日常性交易活動。和其相對的是屬于決策分析層次的聯(lián)機分析處理(OLAP)。
在互聯(lián)網(wǎng)的業(yè)務(wù)場景中,也有一些實時性要求不高 (可以接受多秒的延遲),但是查詢復(fù)雜性卻很高的場景。舉個例子,在電商的 WMS 系統(tǒng)中,或者在大多數(shù)業(yè)務(wù)場景豐富的 CRM 或者客服系統(tǒng)中,可能需要提供幾十個字段的隨意組合查詢功能。這種系統(tǒng)的數(shù)據(jù)維度天生眾多,比如一個電商的 WMS 中對一件貨物的描述,可能有下面這些字段:
倉庫 id,入庫時間,庫位分區(qū) id,儲存貨架 id,入庫操作員 id,出庫操作員 id,庫存數(shù)量,過期時間,SKU 類型,產(chǎn)品品牌,產(chǎn)品分類,內(nèi)件數(shù)量
除了上述信息,如果商品在倉庫內(nèi)有流轉(zhuǎn)??赡苓€有有關(guān)聯(lián)的流程 id,當(dāng)前的流轉(zhuǎn)狀態(tài)等等。
想像一下,如果我們所經(jīng)營的是一個大型電商,每天有千萬級別的訂單,那么在這個數(shù)據(jù)庫中查詢和建立合適的索引都是一件非常難的事情。
在 CRM 或客服類系統(tǒng)中,常常有根據(jù)關(guān)鍵字進行搜索的需求,大型互聯(lián)網(wǎng)公司每天會接收數(shù)以萬計的用戶投訴。而考慮到事件溯源,用戶的投訴至少要存 2~3 年。又是千萬級甚至上億的數(shù)據(jù)。根據(jù)關(guān)鍵字進行一次 like 查詢,可能整個 MySQL 就直接掛掉了。
這時候我們就需要搜索引擎來救場了。
Elasticsearch 是開源分布式搜索引擎的霸主,其依賴于 Lucene 實現(xiàn),在部署和運維方面做了很多優(yōu)化。當(dāng)今搭建一個分布式搜索引擎比起 Sphinx 的時代已經(jīng)是容易很多很多了。只要簡單配置客戶端 IP 和端口就可以了。
雖然 es 是針對搜索場景來定制的,但如前文所言,實際應(yīng)用中常常用 es 來作為 database 來使用,就是因為倒排列表的特性??梢杂帽容^樸素的觀點來理解倒排索引:
圖 6-10 倒排列表
對 Elasticsearch 中的數(shù)據(jù)進行查詢時,本質(zhì)就是求多個排好序的序列求交集。非數(shù)值類型字段涉及到分詞問題,大多數(shù)內(nèi)部使用場景下,我們可以直接使用默認的 bi-gram 分詞。什么是 bi-gram 分詞呢:
即將所有 Ti
和 T(i+1)
組成一個詞(在 Elasticsearch 中叫 term),然后再編排其倒排列表,這樣我們的倒排列表大概就是這樣的:
圖 6-11 “今天天氣很好” 的分詞結(jié)果
當(dāng)用戶搜索'天氣很好'時,其實就是求:天氣、氣很、很好三組倒排列表的交集,但這里的相等判斷邏輯有些特殊,用偽代碼表示一下:
func equal() {
if postEntry.docID of '天氣' == postEntry.docID of '氣很' &&
postEntry.offset + 1 of '天氣' == postEntry.offset of '氣很' {
return true
}
if postEntry.docID of '氣很' == postEntry.docID of '很好' &&
postEntry.offset + 1 of '氣很' == postEntry.offset of '很好' {
return true
}
if postEntry.docID of '天氣' == postEntry.docID of '很好' &&
postEntry.offset + 2 of '天氣' == postEntry.offset of '很好' {
return true
}
return false
}
多個有序列表求交集的時間復(fù)雜度是:O(N*M)
,N 為給定列表當(dāng)中元素數(shù)最小的集合,M 為給定列表的個數(shù)。
在整個算法中起決定作用的一是最短的倒排列表的長度,其次是詞數(shù)總和,一般詞數(shù)不會很大(想像一下,你會在搜索引擎里輸入幾百字來搜索么?),所以起決定性作用的,一般是所有倒排列表中,最短的那一個的長度。
因此,文檔總數(shù)很多的情況下,搜索詞的倒排列表最短的那一個不長時,搜索速度也是很快的。如果用關(guān)系型數(shù)據(jù)庫,那就需要按照索引(如果有的話)來慢慢掃描了。
es 定義了一套查詢 DSL,當(dāng)我們把 es 當(dāng)數(shù)據(jù)庫使用時,需要用到其 bool 查詢。舉個例子:
{
"query": {
"bool": {
"must": [
{
"match": {
"field_1": {
"query": "1",
"type": "phrase"
}
}
},
{
"match": {
"field_2": {
"query": "2",
"type": "phrase"
}
}
},
{
"match": {
"field_3": {
"query": "3",
"type": "phrase"
}
}
},
{
"match": {
"field_4": {
"query": "4",
"type": "phrase"
}
}
}
]
}
},
"from": 0,
"size": 1
}
看起來比較麻煩,但表達的意思很簡單:
if field_1 == 1 && field_2 == 2 && field_3 == 3 && field_4 == 4 {
return true
}
用 bool should query 可以表示 or 的邏輯:
{
"query": {
"bool": {
"should": [
{
"match": {
"field_1": {
"query": "1",
"type": "phrase"
}
}
},
{
"match": {
"field_2": {
"query": "3",
"type": "phrase"
}
}
}
]
}
},
"from": 0,
"size": 1
}
這里表示的是類似:
if field_1 == 1 || field_2 == 2 {
return true
}
這些 Go 代碼里 if
后面跟著的表達式在編程語言中有專有名詞來表達 Boolean Expression
:
4 > 1
5 == 2
3 <i && x> 10
es 的 Bool Query
方案,就是用 json 來表達了這種程序語言中的 Boolean Expression,為什么可以這么做呢?因為 json 本身是可以表達樹形結(jié)構(gòu)的,我們的程序代碼在被編譯器 parse 之后,也會變成 AST,而 AST 抽象語法樹,顧名思義,就是樹形結(jié)構(gòu)。理論上 json 能夠完備地表達一段程序代碼被 parse 之后的結(jié)果。這里的 Boolean Expression 被編譯器 Parse 之后也會生成差不多的樹形結(jié)構(gòu),而且只是整個編譯器實現(xiàn)的一個很小的子集。
初始化:
// 選用 elastic 版本時
// 注意與自己使用的 elasticsearch 要對應(yīng)
import (
elastic "gopkg.in/olivere/elastic.v3"
)
var esClient *elastic.Client
func initElasticsearchClient(host string, port string) {
var err error
esClient, err = elastic.NewClient(
elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)),
elastic.SetMaxRetries(3),
)
if err != nil {
// log error
}
}
插入:
func insertDocument(db string, table string, obj map[string]interface{}) {
id := obj["id"]
var indexName, typeName string
// 數(shù)據(jù)庫中的 database/table 概念,可以簡單映射到 es 的 index 和 type
// 不過需要注意,因為 es 中的 _type 本質(zhì)上只是 document 的一個字段
// 所以單個 index 內(nèi)容過多會導(dǎo)致性能問題
// 在新版本中 type 已經(jīng)廢棄
// 為了讓不同表的數(shù)據(jù)落入不同的 index,這里我們用 table+name 作為 index 的名字
indexName = fmt.Sprintf("%v_%v", db, table)
typeName = table
// 正常情況
res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do()
if err != nil {
// handle error
} else {
// insert success
}
}
獲?。?
func query(indexName string, typeName string) (*elastic.SearchResult, error) {
// 通過 bool must 和 bool should 添加 bool 查詢條件
q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1),
elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m")))
q = q.Should(
elastic.NewMatchPhraseQuery("name", "alex"),
elastic.NewMatchPhraseQuery("name", "xargin"),
)
searchService := esClient.Search(indexName).Type(typeName)
res, err := searchService.Query(q).Do()
if err != nil {
// log error
return nil, err
}
return res, nil
}
刪除:
func deleteDocument(
indexName string, typeName string, obj map[string]interface{},
) {
id := obj["id"]
res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do()
if err != nil {
// handle error
} else {
// delete success
}
}
因為 Lucene 的性質(zhì),本質(zhì)上搜索引擎內(nèi)的數(shù)據(jù)是不可變的,所以如果要對文檔進行更新,Lucene 內(nèi)部是按照 id 進行完全覆蓋 (本質(zhì)是取同一 id 最新的 segment 中的數(shù)據(jù)) 的操作,所以與插入的情況是一樣的。
使用 es 作為數(shù)據(jù)庫使用時,需要注意,因為 es 有索引合并的操作,所以數(shù)據(jù)插入到 es 中到可以查詢的到需要一段時間(由 es 的 refresh_interval 決定)。所以千萬不要把 es 當(dāng)成強一致的關(guān)系型數(shù)據(jù)庫來使用。
比如我們有一段 bool 表達式,user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1)
,寫成 SQL 是如下形式:
select * from xxx where user_id = 1 and (
product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1
)
寫成 es 的 DSL 是如下形式:
{
"query": {
"bool": {
"must": [
{
"match": {
"user_id": {
"query": "1",
"type": "phrase"
}
}
},
{
"match": {
"product_id": {
"query": "1",
"type": "phrase"
}
}
},
{
"bool": {
"should": [
{
"match": {
"star_num": {
"query": "4",
"type": "phrase"
}
}
},
{
"match": {
"star_num": {
"query": "5",
"type": "phrase"
}
}
}
]
}
},
{
"match": {
"banned": {
"query": "1",
"type": "phrase"
}
}
}
]
}
},
"from": 0,
"size": 1
}
es 的 DSL 雖然很好理解,但是手寫起來非常費勁。前面提供了基于 SDK 的方式來寫,但也不足夠靈活。
SQL 的 where 部分就是 boolean expression。我們之前提到過,這種 bool 表達式在被解析之后,和 es 的 DSL 的結(jié)構(gòu)長得差不多,我們能不能直接通過這種 “差不多” 的猜測來直接幫我們把 SQL 轉(zhuǎn)換成 DSL 呢?
當(dāng)然可以,我們把 SQL 的 where 被 Parse 之后的結(jié)構(gòu)和 es 的 DSL 的結(jié)構(gòu)做個對比:
圖 6-12 AST 和 DSL 之間的對應(yīng)關(guān)系
既然結(jié)構(gòu)上完全一致,邏輯上我們就可以相互轉(zhuǎn)換。我們以廣度優(yōu)先對 AST 樹進行遍歷,然后將二元表達式轉(zhuǎn)換成 json 字符串,再拼裝起來就可以了,限于篇幅,本文中就不給出示例了,讀者朋友可以查看:
github.com/cch123/elasticsql
來學(xué)習(xí)具體的實現(xiàn)。
在實際應(yīng)用中,我們很少直接向搜索引擎中寫入數(shù)據(jù)。更為常見的方式是,將 MySQL 或其它關(guān)系型數(shù)據(jù)中的數(shù)據(jù)同步到搜索引擎中。而搜索引擎的使用方只能對數(shù)據(jù)進行查詢,無法進行修改和刪除。
常見的同步方案有兩種:
圖 6-13 基于時間戳的數(shù)據(jù)同步
這種同步方式與業(yè)務(wù)強綁定,例如 WMS 系統(tǒng)中的出庫單,我們并不需要非常實時,稍微有延遲也可以接受,那么我們可以每分鐘從 MySQL 的出庫單表中,把最近十分鐘創(chuàng)建的所有出庫單取出,批量存入 es 中,取數(shù)據(jù)的操作需要執(zhí)行的邏輯可以表達為下面的 SQL:
select * from wms_orders where update_time >= date_sub(now(), interval 10 minute);
當(dāng)然,考慮到邊界情況,我們可以讓這個時間段的數(shù)據(jù)與前一次的有一些重疊:
select * from wms_orders where update_time >= date_sub(
now(), interval 11 minute
);
取最近 11 分鐘有變動的數(shù)據(jù)覆蓋更新到 es 中。這種方案的缺點顯而易見,我們必須要求業(yè)務(wù)數(shù)據(jù)嚴格遵守一定的規(guī)范。比如這里的,必須要有 update_time 字段,并且每次創(chuàng)建和更新都要保證該字段有正確的時間值。否則我們的同步邏輯就會丟失數(shù)據(jù)。
圖 6-13 基于 binlog 的數(shù)據(jù)同步
業(yè)界使用較多的是阿里開源的 Canal,來進行 binlog 解析與同步。canal 會偽裝成 MySQL 的從庫,然后解析好行格式的 binlog,再以更容易解析的格式(例如 json)發(fā)送到消息隊列。
由下游的 Kafka 消費者負責(zé)把上游數(shù)據(jù)表的自增主鍵作為 es 的文檔的 id 進行寫入,這樣可以保證每次接收到 binlog 時,對應(yīng) id 的數(shù)據(jù)都被覆蓋更新為最新。MySQL 的 Row 格式的 binlog 會將每條記錄的所有字段都提供給下游,所以在向異構(gòu)數(shù)據(jù)目標同步數(shù)據(jù)時,不需要考慮數(shù)據(jù)是插入還是更新,只要一律按 id 進行覆蓋即可。
這種模式同樣需要業(yè)務(wù)遵守一條數(shù)據(jù)表規(guī)范,即表中必須有唯一主鍵 id 來保證我們進入 es 的數(shù)據(jù)不會發(fā)生重復(fù)。一旦不遵守該規(guī)范,那么就會在同步時導(dǎo)致數(shù)據(jù)重復(fù)。當(dāng)然,你也可以為每一張需要的表去定制消費者的邏輯,這就不是通用系統(tǒng)討論的范疇了。
更多建議: