在一些小型項目當中,沒有引入消息中間件,也不想引入,但有一些業(yè)務邏輯想要解耦異步,那怎么辦呢?
我們的web項目,單獨內網部署,由于大數據背景,公司消息中間件統一使用的kafka,在一些小項目上kafka就顯得很笨重。 引入rocketmq或rabittmq也沒必要。 事件或多線程也不適合。
具體一點的,之前對接的一個系統,一張記錄表有10+以上的類型狀態(tài),新的需求是,針對每種狀態(tài)做出對應的不同的操作。 之前寫入這張記錄表的時候,方式也是五花八門,有的是單條記錄寫入,有的是批量寫入,有的調用了統一的service,有的呢直接調用了DAO層mapper直接寫入。
所以想找到一個統一入口進行切入處理,就不行了。
這個時候就算引入消息隊列,也需要在不同的業(yè)務方法里進行寫入消息的操作。業(yè)務方也不太愿意配合改。
可以使用觸發(fā)器,但它是屬于上個時代的產物,槽點太多。(這里并不是完全不主張使用觸發(fā)器,技術永遠是為業(yè)務服務的,只要評估覺得可行,就可以使用)那么這個時候,CDC技術就可以粉墨登場了。
CDC(change data capture)數據更改捕獲。常見的數據更改捕獲都是通過數據庫比如mysql的binlog來達到目的。
我們可以監(jiān)控mysql binlog日志,當寫入一條數據的時候,接收到數據變更日志,做出相應的操作。
這樣的好處是,只需導入依賴,不額外引入組件,同時無需改動之前的代碼。 兩邊完全解耦,互不干擾。
常見的CDC框架,比如,canal (非Camel)
canal [k?'n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費 早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業(yè)務需求,實現方式主要是基于業(yè)務 trigger 獲取增量變更。 從 2010 年開始,業(yè)務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業(yè)務。
它是基于日志增量訂閱和消費的業(yè)務,包括
數據庫鏡像 數據庫實時備份 索引構建和實時維護(拆分異構索引、倒排索引等) 業(yè)務 cache 刷新 帶業(yè)務邏輯的增量數據處理

它的原理
canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送dump 協議
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal );關注工眾號:碼猿技術專欄,回復關鍵詞:1111 獲取阿里內部Java性能調優(yōu)手冊!
canal 解析 binary log 對象(原始為 byte 流)
再比如,debezium(音同 dbzm 滴BZ姆)很多人可能不太了解. 包括databus,maxwell,flink cdc(大數據領域)等等,它們同屬CDC捕獲數據更改(change data capture)類的技術。

為什么是debezium
這么多技術框架,為什么選debezium?
看起來很多。但一一排除下來就debezium和canal。
sqoop,kettle,datax之類的工具,屬于前大數據時代的產物,地位類似于web領域的structs2。而且,它們基于查詢而非binlog日志,其實不屬于CDC。首先排除。
flink cdc是大數據領域的框架,一般web項目的數據量屬于大材小用了。
同時databus,maxwell相對比較冷門,用得比較少。
最后不用canal的原因有以下幾點。
canal需要安裝,這違背了“如非必要,勿增實體”的原則。
canal只能對MYSQL進行CDC監(jiān)控。有很大的局限性。
大數據領域非常流行的flink cdc(阿里團隊主導)底層使用的也是debezium,而非同是阿里出品的canal。
debezium可借助kafka組件,將變動的數據發(fā)到kafka topic,后續(xù)的讀取操作只需讀取kafka,可有效減少數據庫的讀取壓力。可保證一次語義,至少一次語義。 同時,也可基于內嵌部署模式,無需我們手動部署kafka集群,可滿足”如非必要,勿增實體“的原則。

Debezium是一個捕獲數據更改(CDC)平臺,并且利用Kafka和Kafka Connect實現了自己的持久性、可靠性和容錯性。 每一個部署在Kafka Connect分布式的、可擴展的、容錯性的服務中的connector監(jiān)控一個上游數據庫服務器,捕獲所有的數據庫更改, 然后記錄到一個或者多個Kafka topic(通常一個數據庫表對應一個kafka topic)。
Kafka確保所有這些數據更改事件都能夠多副本并且總體上有序(Kafka只能保證一個topic的單個分區(qū)內有序),這樣, 更多的客戶端可以獨立消費同樣的數據更改事件而對上游數據庫系統造成的影響降到很小(如果N個應用都直接去監(jiān)控數據庫更改,對數據庫的壓力為N, 而用debezium匯報數據庫更改事件到kafka,所有的應用都去消費kafka中的消息,可以把對數據庫的壓力降到1)。
另外,客戶端可以隨時停止消費,然后重啟, 從上次停止消費的地方接著消費。每個客戶端可以自行決定他們是否需要exactly-once或者at-least-once消息交付語義保證, 并且所有的數據庫或者表的更改事件是按照上游數據庫發(fā)生的順序被交付的。

對于不需要或者不想要這種容錯級別、性能、可擴展性、可靠性的應用,他們可以使用內嵌的Debezium connector引擎來直接在應用內部運行connector。 這種應用仍需要消費數據庫更改事件,但更希望connector直接傳遞給它,而不是持久化到Kafka里。
簡介
Debezium是一個開源項目,為捕獲數據更改(change data capture,CDC)提供了一個低延遲的流式處理平臺。你可以安裝并且配置Debezium去監(jiān)控你的數據庫,然后你的應用就可以消費對數據庫的每一個行級別(row-level)的更改。只有已提交的更改才是可見的,所以你的應用不用擔心事務(transaction)或者更改被回滾(roll back)。Debezium為所有的數據庫更改事件提供了一個統一的模型,所以你的應用不用擔心每一種數據庫管理系統的錯綜復雜性。另外,由于Debezium用持久化的、有副本備份的日志來記錄數據庫數據變化的歷史,因此,你的應用可以隨時停止再重啟,而不會錯過它停止運行時發(fā)生的事件,保證了所有的事件都能被正確地、完全地處理掉。
監(jiān)控數據庫,并且在數據變動的時候獲得通知一直是很復雜的事情。關系型數據庫的觸發(fā)器可以做到,但是只對特定的數據庫有效,而且通常只能更新數據庫內的狀態(tài)(無法和外部的進程通信)。一些數據庫提供了監(jiān)控數據變動的API或者框架,但是沒有一個標準,每種數據庫的實現方式都是不同的,并且需要大量特定的知識和理解特定的代碼才能運用。確保以相同的順序查看和處理所有更改,同時最小化影響數據庫仍然非常具有挑戰(zhàn)性。
Debezium提供了模塊為你做這些復雜的工作。一些模塊是通用的,并且能夠適用多種數據庫管理系統,但在功能和性能方面仍有一些限制。另一些模塊是為特定的數據庫管理系統定制的,所以他們通常可以更多地利用數據庫系統本身的特性來提供更多功能。
github官網上羅列的一些典型應用場景 :
緩存失效(Cache invalidation) 經典問題 Redis與MySQL雙寫一致性如何保證?Debezium利用kafka單分區(qū)的有序性(忽略mysql binlog本身可能的延遲和亂序),可完全解決此問題。 在緩存中緩存的條目(entry)在源頭被更改或者被刪除的時候立即讓緩存中的條目失效。 如果緩存在一個獨立的進程中運行(例如Redis,Memcache,Infinispan或者其他的),那么簡單的緩存失效邏輯可以放在獨立的進程或服務中, 從而簡化主應用的邏輯。在一些場景中,緩存失效邏輯可以更復雜一點,讓它利用更改事件中的更新數據去更新緩存中受影響的條目。
簡化單體應用(Simplifying monolithic applications) 許多應用更新數據庫,然后在數據庫中的更改被提交后,做一些額外的工作:更新搜索索引,更新緩存,發(fā)送通知,運行業(yè)務邏輯,等等。 這種情況通常稱為雙寫(dual-writes),因為應用沒有在一個事務內寫多個系統。這樣不僅應用邏輯復雜難以維護, 而且雙寫容易丟失數據或者在一些系統更新成功而另一些系統沒有更新成功的時候造成不同系統之間的狀態(tài)不一致。使用捕獲更改數據技術(change data capture,CDC), 在源數據庫的數據更改提交后,這些額外的工作可以被放在獨立的線程或者進程(服務)中完成。這種實現方式的容錯性更好,不會丟失事件,容易擴展,并且更容易支持升級。
共享數據庫(Sharing databases) 當多個應用共用同一個數據庫的時候,一個應用提交的更改通常要被另一個應用感知到。一種實現方式是使用消息總線, 盡管非事務性(non-transactional)的消息總線總會受上面提到的雙寫(dual-writes)影響。但是,另一種實現方式,即Debezium,變得很直接:每個應用可以直接監(jiān)控數據庫的更改,并且響應更改。
數據集成(Data integration) 數據通常被存儲在多個地方,尤其是當數據被用于不同的目的的時候,會有不同的形式。保持多系統的同步是很有挑戰(zhàn)性的, 但是可以通過使用Debezium加上簡單的事件處理邏輯來實現簡單的ETL類型的解決方案。
命令查詢職責分離(CQRS) 在命令查詢職責分離 Command Query Responsibility Separation (CQRS) 架構模式中,更新數據使用了一種數據模型, 讀數據使用了一種或者多種數據模型。由于數據更改被記錄在更新側(update-side),這些更改將被處理以更新各種讀展示。 所以CQRS應用通常更復雜,尤其是他們需要保證可靠性和全序(totally-ordered)處理。Debezium和CDC可以使這種方式更可行: 寫操作被正常記錄,但是Debezium捕獲數據更改,并且持久化到全序流里,然后供那些需要異步更新只讀視圖的服務消費。 寫側(write-side)表可以表示面向領域的實體(domain-oriented entities),或者當CQRS和 Event Sourcing 結合的時候,寫側表僅僅用做追加操作命令事件的日志。
springboot 整合 Debezium
依賴
1.7.0.Final 8.0.26 mysql mysql-connector-java ${mysql.connector.version} runtime io.debezium debezium-api ${debezium.version} io.debezium debezium-embedded ${debezium.version} io.debezium debezium-connector-mysql ${debezium.version} mysql mysql-connector-java
注意debezium版本為1.7.0.Final,對應mysql驅動為8.0.26,低于這個版本會報兼容錯誤。
配置
相應的配置
debezium.datasource.hostname=localhost debezium.datasource.port=3306 debezium.datasource.user=root debezium.datasource.password=123456 debezium.datasource.tableWhitelist=test.test debezium.datasource.storageFile=E:/debezium/test/offsets/offset.dat debezium.datasource.historyFile=E:/debezium/test/history/custom-file-db-history.dat debezium.datasource.flushInterval=10000 debezium.datasource.serverId=1 debezium.datasource.serverName=name-1
然后進行配置初始化。
主要的配置項:
connector.class
監(jiān)控的數據庫類型,這里選mysql。
offset.storage
選擇FileOffsetBackingStore時,意思把讀取進度存到本地文件,因為我們不用kafka,當使用kafka時,選KafkaOffsetBackingStore 。
offset.storage.file.filename
存放讀取進度的本地文件地址。
offset.flush.interval.ms
讀取進度刷新保存頻率,默認1分鐘。如果不依賴kafka的話,應該就沒有exactly once只讀取一次語義,應該是至少讀取一次。意味著可能重復讀取。如果web容器掛了,最新的讀取進度沒有刷新到文件里,下次重啟時,就會重復讀取binlog。
table.whitelist
監(jiān)控的表名白名單,建議設置此值,只監(jiān)控這些表的binlog。
database.whitelist
監(jiān)控的數據庫白名單,如果選此值,會忽略table.whitelist,然后監(jiān)控此db下所有表的binlog。
/**
*@className:MysqlConfig
*@author:nyp
*@description:TODO
*@date:2023/8/713:53
*@version:1.0
*/
@Configuration
@ConfigurationProperties(prefix="debezium.datasource")
@Data
publicclassMysqlBinlogConfig{
privateStringhostname;
privateStringport;
privateStringuser;
privateStringpassword;
privateStringtableWhitelist;
privateStringstorageFile;
privateStringhistoryFile;
privateLongflushInterval;
privateStringserverId;
privateStringserverName;
@Bean
publicio.debezium.config.ConfigurationMysqlBinlogConfig()throwsException{
checkFile();
io.debezium.config.Configurationconfiguration=io.debezium.config.Configuration.create()
.with("name","mysql_connector")
.with("connector.class",MySqlConnector.class)
//.with("offset.storage",KafkaOffsetBackingStore.class)
.with("offset.storage",FileOffsetBackingStore.class)
.with("offset.storage.file.filename",storageFile)
.with("offset.flush.interval.ms",flushInterval)
.with("database.history",FileDatabaseHistory.class.getName())
.with("database.history.file.filename",historyFile)
.with("snapshot.mode","Schema_only")
.with("database.server.id",serverId)
.with("database.server.name",serverName)
.with("database.hostname",hostname)
//.with("database.dbname",dbname)
.with("database.port",port)
.with("database.user",user)
.with("database.password",password)
//.with("database.whitelist","test")
.with("table.whitelist",tableWhitelist)
.build();
returnconfiguration;
}
privatevoidcheckFile()throwsIOException{
Stringdir=storageFile.substring(0,storageFile.lastIndexOf("/"));
FiledirFile=newFile(dir);
if(!dirFile.exists()){
dirFile.mkdirs();
}
Filefile=newFile(storageFile);
if(!file.exists()){
file.createNewFile();
}
}
}
snapshot.mode 快照模式,指定連接器啟動時運行快照的條件。可能的設置有:
initial 只有在沒有為邏輯服務器名記錄偏移量時,連接器才運行快照。
When_needed 當連接器認為有必要時,它會在啟動時運行快照。也就是說,當沒有可用的偏移量時,或者當先前記錄的偏移量指定了服務器中不可用的binlog位置或GTID時。
Never 連接器從不使用快照。在第一次使用邏輯服務器名啟動時,連接器從binlog的開頭讀取。謹慎配置此行為。只有當binlog保證包含數據庫的整個歷史記錄時,它才有效。
Schema_only 連接器運行模式而不是數據的快照。當您不需要主題包含數據的一致快照,而只需要主題包含自連接器啟動以來的更改時,此設置非常有用。
Schema_only_recovery 這是已經捕獲更改的連接器的恢復設置。當您重新啟動連接器時,此設置允許恢復損壞或丟失的數據庫歷史主題。您可以定期將其設置為“清理”意外增長的數據庫歷史主題。數據庫歷史主題需要無限保留。
database.server.id
偽裝成slave的Debezium服務的id,自定義,有多個Debezium服務不能重復,如果重復的話會報以下異常。
io.debezium.DebeziumException:Aslavewiththesameserver_uuid/server_idasthisslavehasconnectedtothemaster;thefirstevent'binlog.000013'at46647257,thelasteventreadfrom'./binlog.000013'at125,thelastbytereadfrom'./binlog.000013'at46647257.Errorcode:1236;SQLSTATE:HY000. atio.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1167) atio.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1212) atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980) atcom.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599) atcom.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857) atjava.lang.Thread.run(Thread.java:750) Causedby:com.github.shyiko.mysql.binlog.network.ServerException:Aslavewiththesameserver_uuid/server_idasthisslavehasconnectedtothemaster;thefirstevent'binlog.000013'at46647257,thelasteventreadfrom'./binlog.000013'at125,thelastbytereadfrom'./binlog.000013'at46647257. atcom.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:944) ...3commonframesomitted
監(jiān)聽
配置監(jiān)聽服務
/**
*@projectName:test
*@package:com.test.config
*@className:MysqlBinlogListener
*@author:nyp
*@description:TODO
*@date:2023/8/713:56
*@version:1.0
*/
@Component
@Slf4j
publicclassMysqlBinlogListener{
@Resource
privateExecutortaskExecutor;
privatefinalList>>engineList=newArrayList<>();
privateMysqlBinlogListener(@Qualifier("mysqlConnector")Configurationconfiguration){
this.engineList.add(DebeziumEngine.create(Json.class)
.using(configuration.asProperties())
.notifying(record->receiveChangeEvent(record.value()))
.build());
}
privatevoidreceiveChangeEvent(Stringvalue){
if(Objects.nonNull(value)){
Mappayload=getPayload(value);
Stringop=JSON.parseObject(JSON.toJSONString(payload.get("op")),String.class);
if(!(StringUtils.isBlank(op)||Envelope.Operation.READ.equals(op))){
ChangeDatachangeData=getChangeData(payload);
log.info("changeData="+changeData);
}
}
}
@PostConstruct
privatevoidstart(){
for(DebeziumEngine>engine:engineList){
taskExecutor.execute(engine);
}
}
@PreDestroy
privatevoidstop(){
for(DebeziumEngine>engine:engineList){
if(engine!=null){
try{
engine.close();
}catch(IOExceptione){
log.error("",e);
}
}
}
}
publicstaticMapgetPayload(Stringvalue){
Mapmap=JSON.parseObject(value,Map.class);
Mappayload=JSON.parseObject(JSON.toJSONString(map.get("payload")),Map.class);
returnpayload;
}
publicstaticChangeDatagetChangeData(Mappayload){
Mapsource=JSON.parseObject(JSON.toJSONString(payload.get("source")),Map.class);
returnChangeData.builder()
.op(payload.get("op").toString())
.table(source.get("table").toString())
.after(JSON.parseObject(JSON.toJSONString(payload.get("after")),Map.class))
.source(JSON.parseObject(JSON.toJSONString(payload.get("source")),Map.class))
.before(JSON.parseObject(JSON.toJSONString(payload.get("before")),Map.class))
.build();
}
@Data
@Builder
publicstaticclassChangeData{
/**
*更改前數據
*/
privateMapafter;
privateMapsource;
/**
*更改后數據
*/
privateMapbefore;
/**
*更改的表名
*/
privateStringtable;
/**
*操作類型,枚舉Envelope.Operation
*/
privateStringop;
}
}
將監(jiān)聽到的binlog日志封裝為ChangeData對象,包括表名,更改前后的數據,
以及操作類型
READ("r"),
CREATE("c"),
UPDATE("u"),
DELETE("d"),
TRUNCATE("t");
測試
update操作輸出
MysqlListener.ChangeData(after={
name=SuzukiMio2,
id=1
},source={
file=binlog.000013,
connector=mysql,
pos=42587833,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691458956000,
snapshot=false,
db=test
table=test
},before={
name=SuzukiMio,
id=1
},table=test,op=u)
data={
name=SuzukiMio2,
id=1
}
新增操作輸出
MysqlListener.ChangeData(after={
name=王五,
id=0
},source={
file=binlog.000013,
connector=mysql,
pos=42588175,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691459066000,
snapshot=false,
db=test,
table=test
},before=null,table=test,op=c)
刪除操作輸出
MysqlListener.ChangeData(after=null,source={
file=binlog.000013,
connector=mysql,
pos=42588959,
name=test-1,
row=0,
server_id=1,
version=1.7.0.Final,
ts_ms=1691459104000,
snapshot=false,
db=test
table=test
},before={
name=王五,
id=0
},table=test,op=d)
我們之前配置的保存讀取進度的文件storageFile,類似于kafka的偏移量,記錄的內容如下:

停止服務,對數據庫進行操作,再次重啟,會根據進度重新讀取。
小結
本文介紹了debezium,更多的時候,我們一談到CDC,第一想到的是大量數據同步的工具。 但其實也可以利用其數據變更捕獲的特性,來達到一部份消息隊列的作用。 但其畢竟不能完全替代消息隊列。大家理性看待與選擇。
本文的重點在介紹一種思路,具體的某項技術反而不那么重要。
審核編輯:劉清
-
JAVA語言
+關注
關注
0文章
138瀏覽量
21529 -
MYSQL數據庫
+關注
關注
0文章
97瀏覽量
10257 -
CDC技術
+關注
關注
0文章
9瀏覽量
7072
原文標題:不想引入MQ?試試debezium
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關注!文章轉載請注明出處。
發(fā)布評論請先 登錄
使用多片DAC61416芯片,如輸出50channel,這么多通道還能同時輸出嗎?
為什么OLED初始化的時候要這么多命令?
怎么記住這么多代碼格式?
為什么要搞這么多架構
為什么有這么多編程語言呢
這么多內網穿透工具怎么選?一篇讓你不再糾結的終極指南!
工業(yè)上面為什么有這么多通訊協議?
這么多技術框架,為什么選debezium?
評論