- 工信部備案號 滇ICP備05000110號-1
- 滇公安備案 滇53010302000111
- 增值電信業務經營許可證 B1.B2-20181647、滇B1.B2-20190004
- 云南互聯網協會理事單位
- 安全聯盟認證網站身份V標記
- 域名注冊服務機構許可:滇D3-20230001
- 代理域名注冊服務機構:新網數碼
代碼分析事務消息
分布式事務模型圖如下:
我們先看事務消息客戶端的實現。即上圖的MQClient.
我們先看代碼的整體封裝。
下面代碼段做了兩件事:
1.本地數據庫寫入 2.事務消息客戶端發送消息。
@Transactional表示開啟事務。在注釋下,開啟一個新的Order。用OrderDao將數據寫入本地數據庫。然后調用transactionMsgClient.sendMsg將消息發出去(這是靜態方法調用,transactionMsgClient是一個類,sendMsg是它的方法)。這樣,本地數據庫寫入和發消息這兩件事,就是個原子事務,也就是說,兩件事要么一起成功,要么一起失敗。
上面代碼用到了MybatisTransactionMsgClient。MyBatis是一個Java持久化框架,它通過XML描述符或注解把對象與存儲過程或SQL語句關聯起來,映射成數據庫內對應的記錄。
上面提到過,transactionMsgClient.sendMsg是個類,這個類繼承了TransactionMsgClient。下面代碼中,transactionMsgClient.sendMsg調用了其父類的TransactionsendClient的sendMsg方法,寫事務消息表,并且發送消息。
我們接下來看sendMsg這個方法到底做了什么:
1、消息內容落數據庫 2、發送消息
下面代碼會先做一個判斷,在if字段里:con.getAutoCommit。也就是說,只有當沒有開啟自動commit的時候(有自動提交就破壞了事務的原子性),把信息寫在數據庫表里,然后構造一個messages,發消息。而發消息的方法是將消息放到消息隊列中。
在事務消息設計中,后臺發送消息隊列設計,如下圖所示:
參照上圖,我們可以看到后臺發送消息隊列有兩個:
SendMsg隊列的消息消費很快,基本上放進去很快就會被消費掉。這樣重試才能有效,否則一直重試,沒有意義。
下面代碼段的的作用:是發送消息時,從隊列里中取消息,看是否到期,將到期的消息取出來:
后臺優先隊列的維護。
最后,事務消息表的設計;
代碼分析事務消息
我們知道,RocketMQ支持延時消息。我們先看一下延時消息的應用場景。
延時需求是在當前時間后的某一時間點觸發指定的業務邏輯或操作。
例如微信發消息,過一段時間沒發送成功的話,提示重新發送(如下圖的小紅點提示)。
訂單狀態流轉:如延時支付。京東上超過24小時的訂單會被自動取消。
實際上,我們在分布式事務的終極模型中,也用到了延時消息。
RocketMQ支持18個級別的延時等級:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 6h 12h。也就是說,消息延時發送有這18個級別。如果業務的延時消息需求與這18個級別不匹配,就需要自行基于RocketMQ進行二次開發。
接下來,我們看一下18個延時的實現原理。
RocketMQ的18個延時級別,每個級別對應一個Queue,根據Level參數,將消息放到對應的18個隊列中的等級。每個隊列都對應了到時出隊。例如1s隊列就是1s出隊,2小時隊就是2小時出隊。消息出隊以后,當成正常消息投遞。然后被投遞到了消費隊列,被消費者消費掉。
售前咨詢
售后咨詢
備案咨詢
二維碼
TOP