Kafka Connect 是一款出色的工具,可讓您輕松設(shè)置從一個(gè)數(shù)據(jù)源到目標(biāo)數(shù)據(jù)庫的連續(xù)數(shù)據(jù)流。它的配置非常簡單,當(dāng)您有遺留系統(tǒng)為您需要的業(yè)務(wù)數(shù)據(jù)提供服務(wù)時(shí),出于某種原因或其他原因,它在不同的地方非常有用。我的典型用例是將數(shù)據(jù)從 Oracle 表移動到微服務(wù)使用的 MongoDB 集合。這允許更好的可擴(kuò)展性,因?yàn)槲覀儾槐厥褂蒙a(chǎn)查詢大量訪問源表。
當(dāng)您打開 Kafka Connect 手冊時(shí),不容易解釋的一件事是如何處理修改已移動的現(xiàn)有數(shù)據(jù)的操作;或者換句話說,更新和刪除。我認(rèn)為這是我們使用的典型 JDBC/MongoDB 連接器對的限制。有一段時(shí)間我探索了 Debezium 連接器,它承諾捕獲這些類型的事件并將它們復(fù)制到目標(biāo)數(shù)據(jù)庫中。使用 OracleDB 的 POC 對我們來說并不成功。我們對這些數(shù)據(jù)庫的訪問有限,而且這些連接器所需的配置級別并不是一個(gè)簡單的解決方案。
當(dāng)我們繼續(xù)使用連接器時(shí),我們發(fā)現(xiàn)有一些方法可以處理這些場景。我將解釋兩種策略。第一個(gè)是最理想的,需要在我們的源數(shù)據(jù)庫中進(jìn)行特定設(shè)計(jì)。如果該設(shè)計(jì)不存在且因任何原因無法更改,則第二個(gè)是替代解決方案。
基本示例
假設(shè)我們有一個(gè)處理促銷活動的舊系統(tǒng)。為了簡化我們的示例,假設(shè)我們有一個(gè)包含三列的基本表。我們需要不斷地將這些數(shù)據(jù)從 SQL 數(shù)據(jù)庫移動到基于文檔的數(shù)據(jù)庫,如 MongoDB。
基本概念
首先,我們需要對可以使用的兩種 Kafka 連接器進(jìn)行快速描述:增量和批量。嚴(yán)格來說,JDBC連接器有四種模式:bulk、timestamp、incrementing、timestamp+incrementing。我將最后三個(gè)分組為增量,因?yàn)樗鼈児蚕硐嗤幕靖拍?。您只想移動從源中檢測到的新數(shù)據(jù)。
批量連接器始終移動整個(gè)數(shù)據(jù)集。但是,很大程度上取決于我們正在移動的數(shù)據(jù)的用例。理想情況下,增量連接器是最好的解決方案,因?yàn)樵谫Y源使用或數(shù)據(jù)準(zhǔn)備方面更容易管理小塊新數(shù)據(jù)。這里的問題是:Kafka Connect 如何使用純 SQL 查詢,以及它如何知道何時(shí)在源中插入了新數(shù)據(jù)?
源連接器配置可以使用以下兩個(gè)屬性之一(或兩者):incrementing.column.name 和 timestamp.column.name。Incrementing 屬性使用增量列(如自動生成的 id)來檢測何時(shí)插入新行。Timestamp 屬性使用 DateTime 列來檢測新更改。Kafka Connect 持有一個(gè)偏移量,將其附加到用于從源獲取數(shù)據(jù)的 SQL 查詢中。
例如,如果我們的表名為“promotions”,我們將在源連接器的查詢屬性中使用,如下所示:
"query": "SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS",
"timestamp.column.name": "LAST_UPDATE_DATE"
Kafka 內(nèi)部將查詢修改為如下所示:
SELECT * FROM ( SELECT TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS)
WHERE LAST_UPDATE_DATE > {OFFSET_DATE}
在接收器連接器端,即在目標(biāo)數(shù)據(jù)庫中保存數(shù)據(jù)的連接器,我們需要設(shè)置一個(gè)策略來根據(jù) ID 進(jìn)行正確的 upsert。您可以在您使用的接收器連接器的文檔中閱讀更多相關(guān)信息。對于 MongoDB 連接器,我使用的典型設(shè)置是:
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInValueStrategy",
這表明我們文檔的 _id 將來自源數(shù)據(jù)。在這種情況下,我們的源查詢應(yīng)該包含一個(gè) _id 列:
"query": "SELECT PROMO_ID as \"_id\", TITLE, DISCOUNT, PRODUCT_CATEGORY FROM PROMOTIONS"
至此,我們有了檢測新插入的基本配置。每次添加帶有新時(shí)間戳的新促銷時(shí),源連接器都會抓取它并將其移動到所需的目的地。但是有了這個(gè)完全相同的配置,我們就可以實(shí)現(xiàn)檢測更新和刪除的總目標(biāo)。我們需要的是正確設(shè)計(jì)我們的數(shù)據(jù)源。
在每次更新時(shí)修改時(shí)間戳列
如果我們想確保我們的更新被處理并反映在目標(biāo)數(shù)據(jù)庫中,我們需要確保在源表中進(jìn)行的每個(gè)更新也更新時(shí)間戳列值。這可以通過寫入它的應(yīng)用程序?qū)?dāng)前時(shí)間戳作為更新操作的參數(shù)來完成,或者創(chuàng)建一個(gè)監(jiān)聽更新事件的觸發(fā)器。由于 sink 連接器根據(jù) id 處理 upsert,更新也會反映在目標(biāo)文檔中。
軟刪除
為了能夠處理刪除,我們需要前面的步驟以及數(shù)據(jù)庫設(shè)計(jì)中被認(rèn)為是好的做法:軟刪除。這種做法是在需要時(shí)不刪除(硬刪除)數(shù)據(jù)庫中的記錄,而只是用一個(gè)特殊的標(biāo)志來標(biāo)記它,表明該記錄不再有效/活動。這在可恢復(fù)性或?qū)徲?jì)方面有其自身的好處。這當(dāng)然意味著我們的應(yīng)用程序或存儲過程需要了解這種設(shè)計(jì)并在查詢數(shù)據(jù)時(shí)過濾掉不活動的記錄。
如果很難更新刪除記錄的應(yīng)用程序來進(jìn)行軟刪除(以防數(shù)據(jù)源的設(shè)計(jì)沒有考慮到這一點(diǎn)),我們還可以使用觸發(fā)器來捕獲硬刪除并改為進(jìn)行軟刪除。
為了我們的 Kafka Connect 目的,我們需要做的是在記錄被標(biāo)記為非活動時(shí)更改我們的時(shí)間戳列值。在此示例中,我們將 HOT SUMMER 促銷設(shè)置為非活動,將 ACTIVE 列設(shè)置為 0。LAST_UPDATE_DATE 還修改為最近的日期,這將使源連接器獲取記錄。
當(dāng)數(shù)據(jù)被移動時(shí),例如移動到 MongoDB,為了使用它,我們還需要根據(jù)這個(gè) ACTIVE 字段進(jìn)行過濾:
db.getCollection('promotions').find({active: 1})
版本化批量
如果我們必須處理不可更改的設(shè)計(jì),則可以使用的最后一種方法選項(xiàng)不允許修改源模式以具有時(shí)間戳列或活動標(biāo)志。這個(gè)選項(xiàng)有我所說的版本化批量。正如我之前所解釋的,每次調(diào)用時(shí),批量連接器都會移動整個(gè)數(shù)據(jù)集。在大多數(shù)情況下,我遇到過增量更新總是更可取的做法,但在這種情況下,我們可以利用批量選項(xiàng)。
由于我們需要跟蹤新插入、更新或刪除的內(nèi)容,因此我們可以每次移動數(shù)據(jù),添加一個(gè)額外的列來標(biāo)識數(shù)據(jù)的快照。我們還可以使用查詢數(shù)據(jù)時(shí)的時(shí)間戳。由于時(shí)間戳是自然后代排序的值,如果我們想要最新的快照,我們可以很容易地通過最后一個(gè)或倒數(shù)第二個(gè)(我將解釋為什么這可能更好)一旦數(shù)據(jù)移動到目標(biāo)位置的快照進(jìn)行過濾。
Oracle 中的查詢?nèi)缦滤荆?/p>
"query": "SELECT PROMO_ID as \"_id\", TITLE, DISCOUNT, PRODUCT_CATEGORY,
TO_CHAR(SYSDATE, 'yyyymmddhh24miss') AS SNAPSHOT FROM PROMOTIONS"
這種方法需要一些配置,這些配置對于使用最終數(shù)據(jù)集時(shí)的正確性能至關(guān)重要。您可以想象,索引在這里很重要,更重要的是,在新的快照列中。另一個(gè)重要的考慮因素是消耗的空間。根據(jù)每個(gè)快照中的記錄數(shù)量,我們可能需要?jiǎng)h除舊版本。我們可以為此使用一些計(jì)劃任務(wù),或者像使用 MongoDB 索引一樣配置 TTL。
在使用數(shù)據(jù)時(shí),我們首先需要獲取最新的快照。我提到倒數(shù)第二個(gè)可能更好。原因是最新的可能是正在進(jìn)行的。換句話說,當(dāng)您執(zhí)行查詢以使用數(shù)據(jù)時(shí),數(shù)據(jù)可能會移動。如果您對目標(biāo)數(shù)據(jù)庫的查詢是任何類型的聚合,您可能會得到不完整的結(jié)果。因此,對于最新的快照,我們不確定它是否處于準(zhǔn)備好使用的狀態(tài)。如果我們抓取倒數(shù)第二個(gè),我們可以確定快照是完整的。
在下一個(gè)示例中,移動了數(shù)據(jù)的兩個(gè)版本。版本 2021073012000包含三個(gè)文檔。較新的版本2021080112000有兩個(gè)文檔,一個(gè)文檔有折扣的更新版本。如您所見,每個(gè)版本都是數(shù)據(jù)源的時(shí)間快照。
這種方法有點(diǎn)棘手,不應(yīng)該是我們的第一選擇。