平時工作中數(shù)據(jù)庫是我們經(jīng)常使用的,在微服務(wù)拆分的架構(gòu)中,各服務(wù)擁有自己的數(shù)據(jù)庫,所以常常會遇到服務(wù)之間數(shù)據(jù)通信的問題。比如,B 服務(wù)數(shù)據(jù)庫的數(shù)據(jù)來源于A服務(wù)的數(shù)據(jù)庫;A 服務(wù)的數(shù)據(jù)有變更操作時,需要同步到 B 服務(wù)中。
第一種解決方案:
在代碼邏輯中,有相關(guān) A 服務(wù)數(shù)據(jù)寫操作時,以調(diào)用接口的方式,調(diào)用 B 服務(wù)接口,B 服務(wù)再將數(shù)據(jù)寫到新的數(shù)據(jù)庫中。這種方式看似簡單,但其實“坑”很多。在 A 服務(wù)代碼邏輯中會增加大量這種調(diào)用接口同步的代碼,增加了項目代碼的復(fù)雜度,以后會越來越難維護。并且,接口調(diào)用的方式并不是一個穩(wěn)定的方式,沒有重試機制,沒有同步位置記錄,接口調(diào)用失敗了怎么處理,突然的大量接口調(diào)用會產(chǎn)生的問題等,這些都要考慮并且在業(yè)務(wù)中處理。這里會有不少工作量。想到這里,就將這個方案排除了。
(推薦課程:SQL教程)
第二種解決方案:
通過數(shù)據(jù)庫的binlog
進行同步。這種解決方案,與 A 服務(wù)是獨立的,不會和 A 服務(wù)有代碼上的耦合??梢灾苯?TCP
連接進行傳輸數(shù)據(jù),優(yōu)于接口調(diào)用的方式。 這是一套成熟的生產(chǎn)解決方案,也有不少binlog
同步的中間件工具,所以我們關(guān)注的就是哪個工具能夠更好的構(gòu)建穩(wěn)定、性能滿足且易于高可用部署的方案。
經(jīng)過調(diào)研,我們選擇了canal。canal
是阿里巴巴 MySQL binlog
增量訂閱&消費組件,已經(jīng)有在生產(chǎn)上實踐的例子,并且方便的支持和其他常用的中間件組件組合,比如kafka
,elasticsearch
等,也有了canal-go
go
語言的client
庫,滿足我們在go
上的需求,其他具體內(nèi)容參閱canal
的github
主頁。
原理簡圖
![原理簡圖](https://atts.w3cschool.cn/attachments/image/20200817/1597643005519402.jpg "原理簡圖")
![原理簡圖](https://atts.w3cschool.cn/attachments/image/20200817/1597643026794359.jpg "原理簡圖")
OK,開始干!現(xiàn)在要將 A 數(shù)據(jù)庫的數(shù)據(jù)變更同步到 B 數(shù)據(jù)庫。根據(jù)wiki
很快就用docker
跑起了一臺canal-server
服務(wù),直接用canal-go
寫canal-client
代碼邏輯。用canal-go
直接連canal-server
,canal-server
和canal-client
之間是Socket
來進行通信的,傳輸協(xié)議是TCP
,交互協(xié)議采用的是 Google Protocol Buffer 3.0
。
工作流程
Canal
連接到 A 數(shù)據(jù)庫,模擬slave
canal-client
與Canal
建立連接,并訂閱對應(yīng)的數(shù)據(jù)庫表- A 數(shù)據(jù)庫發(fā)生變更寫入到
binlog
,Canal
向數(shù)據(jù)庫發(fā)送dump
請求,獲取binlog
并解析,發(fā)送解析后的數(shù)據(jù)給canal-client
canal-client
收到數(shù)據(jù),將數(shù)據(jù)同步到新的數(shù)據(jù)庫
Protocol Buffer
的序列化速度還是很快的。反序列化后得到的數(shù)據(jù),是每一行的數(shù)據(jù),按照字段名和字段的值的結(jié)構(gòu),放到一個數(shù)組中 代碼簡單示例:
func Handler(entry protocol.Entry) { var keys []string rowChange := &protocol.RowChange{} proto.Unmarshal(entry.GetStoreValue(), rowChange) if rowChange != nil { eventType := rowChange.GetEventType() for _, rowData := range rowChange.GetRowDatas() { // 遍歷每一行數(shù)據(jù) if eventType == protocol.EventType_DELETE || eventType == protocol.EventType_UPDATE { columns := rowData.GetBeforeColumns() // 得到更改前的所有字段屬性 } else if eventType == protocol.EventType_INSERT { columns := rowData.GetAfterColumns() // 得到更后前的所有字段屬性 } ...... } } }
遇到的問題
為了高可用和更高的性能,我們會創(chuàng)建多個canal-client
構(gòu)成一個集群,來進行解析并同步到新的數(shù)據(jù)庫。這里就出現(xiàn)了一個比較重要的問題,如何保證canal-client
集群解析消費binlog
的順序性呢?
我們使用的binlog
是row
模式。每一個寫操作都會產(chǎn)生一條binlog
日志。 舉個簡單的例子:插入了一條 a 記錄,并且立馬修改 a 記錄。這樣會有兩個消息發(fā)送給canal-client
,如果由于網(wǎng)絡(luò)等原因,更新的消息早于插入的消息被處理了,還沒有插入記錄,更新操作的最后效果是失敗的。
怎么辦呢? canal
可以和消息隊列組合呀!而且支持kafka
,rabbitmq
,rocketmq
多種選擇,如此優(yōu)秀。我們在消息隊列這層來實現(xiàn)消息的順序性。
選擇canal+kafka方案
我們選擇了消息隊列的業(yè)界標桿: kafka UCloud
提供了kafka
和rocketMQ
消息隊列產(chǎn)品服務(wù),使用它們能夠快速便捷的搭建起一套消息隊列系統(tǒng)。加速開發(fā),方便運維。
下面就讓我們來一探究竟:
1.選擇kafka
消息隊列產(chǎn)品,并申請開通
![kafka消息隊列](https://atts.w3cschool.cn/attachments/image/20200817/1597643207499951.jpg "kafka消息隊列")
2.開通完成后,在管理界面,創(chuàng)建kafka
集群,根據(jù)自身需求,選擇相應(yīng)的硬件配置
![硬件配置](https://atts.w3cschool.cn/attachments/image/20200817/1597643247605810.jpg "硬件配置")
3.一個kafka + ZooKeeper
集群就搭建出來了,給力!
![kafka+ZooKeeper集群](https://atts.w3cschool.cn/attachments/image/20200817/1597643275823817.jpg "kafka+ZooKeeper集群")
并且包含了節(jié)點管理、Topic
管理、Consumer Group
管理,能夠非常方便的直接在控制臺對配置進行修改
監(jiān)控視圖方面,監(jiān)控的數(shù)據(jù)包括kafka
生成和消費QPS
,集群監(jiān)控,ZooKeeper
的監(jiān)控。能夠比較完善的提供監(jiān)控指標。
![監(jiān)控指標](https://atts.w3cschool.cn/attachments/image/20200817/1597643316579478.jpg "監(jiān)控指標")
![監(jiān)控指標](https://atts.w3cschool.cn/attachments/image/20200817/1597643347799100.jpg "監(jiān)控指標")
![監(jiān)控指標](https://atts.w3cschool.cn/attachments/image/20200817/1597643363690805.jpg "監(jiān)控指標")
canal的kafka配置
canal
配上kafka
也非常的簡單。 vi /usr/local/canal/conf/canal.properties
...
# 可選項: tcp(默認), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9002
canal.mq.retries = 0
# flagMessage模式下可以調(diào)大該值, 但不要超過MQ消息體大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下請將該值改大, 建議50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默認50K, 由于kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數(shù)據(jù)的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為flat json格式對象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投遞是否使用事務(wù)
canal.mq.transaction = false
# mq config
canal.mq.topic=default
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\\\..*,.*\\\\..*
canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
canal.mq.partitionHash=mydatabase.mytable
解決順序消費問題
看到下面這一行配置
canal.mq.partitionHash=mydatabase.mytable
我們配置了kafka
的partitionHash
,并且我們一個Topic
就是一個表。這樣的效果就是,一個表的數(shù)據(jù)只會推到一個固定的partition
中,然后再推給consumer
進行消費處理,同步到新的數(shù)據(jù)庫。通過這種方式,解決了之前碰到的binlog
日志順序處理的問題。這樣即使我們部署了多個kafka consumer
端,構(gòu)成一個集群,這樣consumer
從一個partition
消費消息,就是消費處理同一個表的數(shù)據(jù)。這樣對于一個表來說,犧牲掉了并行處理,不過個人覺得,憑借kafka
的性能強大的處理架構(gòu),我們的業(yè)務(wù)在kafka
這個節(jié)點產(chǎn)生瓶頸并不容易。并且我們的業(yè)務(wù)目的不是實時一致性,在一定延遲下,兩個數(shù)據(jù)庫保證最終一致性。
(推薦微課:SQL微課)
下圖是最終的同步架構(gòu),我們在每一個服務(wù)節(jié)點都實現(xiàn)了集群化。全都跑在UCloud
的UK8s
服務(wù)上,保證了服務(wù)節(jié)點的高可用性。
canal
也是集群換,但是某一時刻只會有一臺canal
在處理binlog
,其他都是冗余服務(wù)。當(dāng)這臺canal
服務(wù)掛了,其中一臺冗余服務(wù)就會切換到工作狀態(tài)。同樣的,也是因為要保證binlog
的順序讀取,所以只能有一臺canal
在工作。
![最終同步架構(gòu)](https://atts.w3cschool.cn/attachments/image/20200817/1597643437868571.jpg "最終同步架構(gòu)")
并且,我們還用這套架構(gòu)進行緩存失效的同步。我們使用的緩存模式是:Cache-Aside
。同樣的,如果在代碼中數(shù)據(jù)更改的地方進行緩存失效操作,會將代碼變得復(fù)雜。所以,在上述架構(gòu)的基礎(chǔ)上,將復(fù)雜的觸發(fā)緩存失效的邏輯放到kafka-client
端統(tǒng)一處理,達到一定解耦的目的。
以上就是關(guān)于使用canal + Kafka
進行數(shù)據(jù)庫同步操作的相關(guān)介紹了,希望對大家有所幫助。