AspNetCore&MassTransit Courier實(shí)現(xiàn)分布式事務(wù)的詳細(xì)過(guò)程
目錄
- 分布式事務(wù)
- Saga模式
- 執(zhí)行過(guò)程
- 恢復(fù)策略
- 協(xié)作方式
- 編排式(Orchestrator)
- 協(xié)同式(Choreography)
- MassTransit Courier
- 補(bǔ)償服務(wù)
- 服務(wù)建立
- 服務(wù)配置
- 服務(wù)編排
- 執(zhí)行請(qǐng)求
- 執(zhí)行成功
- 執(zhí)行補(bǔ)償
- 參考文獻(xiàn)
在之前的一篇博文中,CAP框架可以方便我們實(shí)現(xiàn)非實(shí)時(shí)、異步場(chǎng)景下的最終一致性,而有些用例總是無(wú)法避免的需要在實(shí)時(shí)、同步場(chǎng)景下進(jìn)行,可以借助Saga事務(wù)來(lái)解決這一困擾。在一些博文和倉(cāng)庫(kù)中也搜尋到了.Net下實(shí)現(xiàn)Saga模式的解決方案MassTransit,這就省得自己再造輪子了。
分布式事務(wù)
分布式系統(tǒng)中,分布式事務(wù)是一個(gè)不能避免的問(wèn)題,如何保證不同節(jié)點(diǎn)間的數(shù)據(jù)一致性。舉個(gè)常見(jiàn)的例子,下訂單、減庫(kù)存、扣余額,三者在單個(gè)節(jié)點(diǎn)時(shí),可以借助本地事務(wù),實(shí)現(xiàn)要么成功要么失敗。而當(dāng)三者處于不同節(jié)點(diǎn)時(shí),又參雜了如網(wǎng)絡(luò)環(huán)境、節(jié)點(diǎn)自身環(huán)境、服務(wù)環(huán)境等各種因素,使得三個(gè)節(jié)點(diǎn)想要實(shí)現(xiàn)要么成功、要么失敗就增加了許多困難。
CAP理論和BASE理論很好的詮釋了這一問(wèn)題,也有了許多的解決分布式事務(wù)的方案,如2PC、3PC、TCC、本地消息表、Saga等一系列解決方案,面對(duì)不同場(chǎng)景、不同要求等可選擇不同的解決方案。
在之前提到過(guò)一個(gè)基于本地消息表的CAP框架,借助最終一致性很方便的解決了異步非實(shí)時(shí)請(qǐng)求下的分布式事務(wù),而對(duì)于大部分場(chǎng)景雖然可以直接或者妥協(xié)方式使用著異步非實(shí)時(shí),如同步實(shí)時(shí)場(chǎng)景的下訂單且減庫(kù)存變更到異步非實(shí)時(shí)場(chǎng)景的下訂單后發(fā)事件減庫(kù)存,但是總有那么一些場(chǎng)景,不得不去考慮同步實(shí)時(shí)請(qǐng)求下的分布式事務(wù)。
Saga模式
Saga模式又叫做長(zhǎng)時(shí)間運(yùn)行事務(wù)(Long-running-transaction), 由普林斯頓大學(xué)的 Hector Garcia-Molina和Kenneth Salem 1987年發(fā)表的論文《Sagas》。核心思想是將長(zhǎng)事務(wù)拆分為多個(gè)本地短事務(wù),通過(guò)保證所有短事務(wù)的成功或失敗來(lái)決定整體的成功或失敗,由Saga事務(wù)協(xié)調(diào)器協(xié)調(diào)管理,所有節(jié)點(diǎn)執(zhí)行成功,則成功,如有節(jié)點(diǎn)失敗,則反向執(zhí)行前置節(jié)點(diǎn)的補(bǔ)償操作。
- 每個(gè)Saga事務(wù)由一系列冪等的有序子事務(wù)(sub-transaction) Ti 組成。
- 每個(gè)Ti 都有對(duì)應(yīng)的冪等補(bǔ)償動(dòng)作Ci,補(bǔ)償動(dòng)作用于撤銷Ti造成的結(jié)果。
執(zhí)行過(guò)程
當(dāng)正常執(zhí)行時(shí),依照T1、T2、T3三個(gè)短事務(wù)正常執(zhí)行下去,直到最后一個(gè)Tn事務(wù)執(zhí)行完畢,宣告整個(gè)事務(wù)的成功。
而當(dāng)執(zhí)行到某個(gè)Tj出現(xiàn)故障時(shí),則反向補(bǔ)償之前的Tj-1..T1,每個(gè)對(duì)應(yīng)的補(bǔ)償操作Cj-1...C1,其中Tj事務(wù)由于在執(zhí)行階段就已失敗,所以Tj對(duì)應(yīng)的補(bǔ)償動(dòng)作Cj不需要執(zhí)行,即也確定了最后一個(gè)Tn事務(wù)可以不設(shè)置補(bǔ)償動(dòng)作Cn。
恢復(fù)策略
- 向前恢復(fù)(forward recovery):對(duì)于Ti事務(wù)的執(zhí)行,部分場(chǎng)景下可能因?yàn)閿?shù)據(jù)庫(kù)的連接、網(wǎng)絡(luò)的波動(dòng)等導(dǎo)致短暫的失敗,對(duì)Ti事務(wù)重試執(zhí)行,以確保整個(gè)事務(wù)的執(zhí)行,如執(zhí)行T1, T2, T3,當(dāng)執(zhí)行T3失敗時(shí),不直接宣告失敗,對(duì)T3執(zhí)行重試以排除部分不穩(wěn)定因素,如在若干次重試無(wú)效后,再考慮向后恢復(fù)。
- 向后恢復(fù)(backward recovery):按照?qǐng)?zhí)行順序方式作為向前的指向,則向后為反向補(bǔ)償,對(duì)已執(zhí)行過(guò)的節(jié)點(diǎn)順序倒退執(zhí)行各Ti的補(bǔ)償動(dòng)作Ci,也就是把走過(guò)的路往回走,對(duì)執(zhí)行過(guò)的操作執(zhí)行業(yè)務(wù)上的反操作,如正向流程執(zhí)行減庫(kù)存則補(bǔ)償操作時(shí)執(zhí)行加庫(kù)存。
協(xié)作方式
對(duì)于服務(wù)與服務(wù)間的協(xié)作,我們通常有兩種模式:Orchestration(編排式) 和 Choreography(協(xié)同式),在Saga模式中也有著這兩種的實(shí)現(xiàn)。
- 編排式(Orchestrator):把 Saga 的決策和執(zhí)行順序邏輯集中在一個(gè) Saga 編排器類中。Saga 編排器發(fā)出命令式消息給各個(gè) Saga 參與方,指示這些參與方服務(wù)完成具體操作(本地事務(wù))。
- 協(xié)同式(Choreography):把 Saga 的決策和執(zhí)行順序邏輯分布在 Saga 的每個(gè)參與方中,它們通過(guò)交換事件的方式來(lái)進(jìn)行溝通。
編排式與協(xié)同式的差異僅在于服務(wù)之間的協(xié)作方式,每個(gè)參與服務(wù)的接口定義卻沒(méi)有任何區(qū)別。
編排式(Orchestrator)
編排式的 Saga 需要開(kāi)發(fā)人員定義一個(gè)編排器類,用于編排一個(gè)Saga中多個(gè)參與服務(wù)執(zhí)行的流程。如果整個(gè)業(yè)務(wù)流程正常結(jié)束,業(yè)務(wù)就成功完成,一旦這個(gè)過(guò)程的任何環(huán)節(jié)出現(xiàn)失敗,Saga編排器類就會(huì)以相反的順序調(diào)用補(bǔ)償操作,重新進(jìn)行業(yè)務(wù)回滾。
對(duì)于每個(gè)參與的服務(wù)而言,需要做的事情是
- 訂閱并處理命令消息
- 執(zhí)行命令后返回響應(yīng)消息
- 設(shè)計(jì)執(zhí)行邏輯和補(bǔ)償邏輯
以提交訂單為例,假設(shè)場(chǎng)景是分布式系統(tǒng)下,進(jìn)程間以消息傳遞進(jìn)行通信:
1、事務(wù)發(fā)起方的主業(yè)務(wù)邏輯請(qǐng)求預(yù)先定義好的Saga編排器類(內(nèi)部編排了執(zhí)行順序)。
2、Saga編排器類向MQ發(fā)送減庫(kù)存事件,庫(kù)存服務(wù)訂閱事件、執(zhí)行處理并返回MQ處理結(jié)果。
3、Saga編排器類向MQ發(fā)送減余額事件,支付服務(wù)訂閱事件、執(zhí)行處理并返回MQ處理結(jié)果。
4、Saga編排器類向MQ發(fā)送創(chuàng)建訂單命令,訂單服務(wù)訂閱事件并按照命令創(chuàng)建訂單。
5、主業(yè)務(wù)邏輯接收并處理Saga編排器類處理結(jié)果。
6、整個(gè)過(guò)程由Saga 編排器類對(duì)接收到的回復(fù)進(jìn)行判決,來(lái)決定是繼續(xù)執(zhí)行還是懸崖勒馬。
協(xié)同式(Choreography)
沒(méi)有集中式的編排類,而是各參與方間相互訂閱,一個(gè)服務(wù)訂閱另一個(gè)服務(wù)的事件。
先由事務(wù)發(fā)起方執(zhí)行邏輯并發(fā)布一個(gè)事件,該事件被一個(gè)或多個(gè)服務(wù)進(jìn)行訂閱,這些服務(wù)執(zhí)行本地?cái)?shù)據(jù)庫(kù)操作并發(fā)布(或不發(fā)布)新的事件,該部分需要保證本地?cái)?shù)據(jù)庫(kù)的操作成功且寫(xiě)入MQ的消息也成功,可考慮使用本地消息表或是基于MQ事務(wù)。當(dāng)最后一個(gè)服務(wù)執(zhí)行本地事務(wù)并且不發(fā)布任何事件或者發(fā)布的事件沒(méi)有被任何Saga參與者訂閱意味著事務(wù)結(jié)束,則整個(gè)業(yè)務(wù)流程的分布式事務(wù)完成。如果某一服務(wù)出現(xiàn)故障,那么則反向發(fā)布事件,執(zhí)行補(bǔ)償操作,以此回滾。
以提交訂單為例,假設(shè)場(chǎng)景是分布式系統(tǒng)下,進(jìn)程間以消息傳遞進(jìn)行通信:
1、事務(wù)發(fā)起方執(zhí)行主業(yè)務(wù)邏輯發(fā)送提交訂單命令。
2、庫(kù)存服務(wù)訂閱事件、扣減庫(kù)存并發(fā)布已扣減事件。
3、訂單服務(wù)訂閱庫(kù)存已扣減事件,創(chuàng)建訂單并發(fā)布訂單已創(chuàng)建事件。
4、支付服務(wù)訂閱訂單已創(chuàng)建事件,執(zhí)行支付并發(fā)布訂單已支付事件。
5、主業(yè)務(wù)邏輯訂閱訂單已支付事件并處理。
當(dāng)某服務(wù)內(nèi)執(zhí)行時(shí)如存在異常,則反向發(fā)布事件,如訂單創(chuàng)建失敗,則發(fā)布OrderCreatedFailed事件,庫(kù)存服務(wù)訂閱該事件并執(zhí)行補(bǔ)償操作。
相比而言,編排式中參與服務(wù)無(wú)需向協(xié)同式中訂閱上游服務(wù)的事件,減少了服務(wù)間對(duì)事件協(xié)議的依賴,而只需要關(guān)心集權(quán)的編排器類發(fā)送的消息。
MassTransit Courier
補(bǔ)償服務(wù)
當(dāng)開(kāi)啟一個(gè)事務(wù)前,需要做一些準(zhǔn)備,準(zhǔn)備一個(gè)事務(wù)Id,記錄整個(gè)事務(wù)執(zhí)行情況,各Tj事務(wù)執(zhí)行情況,當(dāng)前請(qǐng)求上下文參數(shù),入?yún)?shù)記錄等,以方便執(zhí)行補(bǔ)償操作時(shí)需要用到。如當(dāng)Tj事務(wù)執(zhí)行失敗時(shí),需要對(duì)Cj-1到C1執(zhí)行補(bǔ)償操作,此時(shí)各補(bǔ)償操作需要一些正向執(zhí)行T1,Tj-1的請(qǐng)求參數(shù)或執(zhí)行結(jié)果,因此都需要記錄下來(lái)。
在Courier中,通過(guò)Routing Slip來(lái)完成這些記錄,創(chuàng)建一個(gè)Guid,記錄請(qǐng)求上下文參數(shù)信息,可以綁定幾個(gè)內(nèi)置事件,在各階段到來(lái)時(shí)會(huì)發(fā)送事件,如有需要可以訂閱。
var builder = new RoutingSlipBuilder(NewId.NextGuid());builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed);builder.AddVariable("RequestId", context.RequestId);builder.AddVariable("ResponseAddress", context.ResponseAddress);builder.AddVariable("FaultAddress", context.FaultAddress);builder.AddVariable("Request", context.Message);//組合一系列Activityvar routingSlip = builder.Build();await context.Execute(routingSlip).ConfigureAwait(false);
服務(wù)建立
弄了個(gè)Demo,建立了三個(gè)服務(wù),此處我使用編排式來(lái)完成,但無(wú)論是選用編排式還是協(xié)同式,都借助RabbitMQ實(shí)現(xiàn)消息傳遞。
每個(gè)服務(wù)都安裝了MassTransit相關(guān)的包
MassTransit.AspNetCoreMassTransit.RabbitMQ
將Saga編排器類放置在OrderService中了,對(duì)于編排器類的放置,個(gè)人認(rèn)為是應(yīng)該看用例的主服務(wù)是誰(shuí)而放置,想過(guò)放在BFF去協(xié)調(diào)三個(gè)服務(wù),但是總是感覺(jué)不是BFF的職責(zé)范圍。
服務(wù)配置
在各服務(wù)中對(duì)MassTransit配置,如下在OrderService中對(duì)MassTransit需要使用到的RabbitMQ配置,對(duì)需要進(jìn)行多個(gè)服務(wù)協(xié)作的用例配置Routing Slip,對(duì)消息隊(duì)列偵聽(tīng)訂閱需要的事件并配置相應(yīng)的Activity處理。
services.AddMassTransit(x =>{ var currentAssembly = Assembly.GetExecutingAssembly(); x.AddActivities(currentAssembly); x.AddConsumers(currentAssembly); x.AddRequestClient<createordercommand>(); x.UsingRabbitMq((context, cfg) => {// 配置RabbitMQcfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{ h.Username(Configuration["RabbitmqConfig:Username"]); h.Password(Configuration["RabbitmqConfig:Password"]);});//配置Routing Slipcfg.ReceiveEndpoint("CreateOrderCommand", ep =>{ ep.ConfigureConsumer<createorderrequestproxy>(context); ep.ConfigureConsumer<createorderresponseproxy>(context);});// 配置訂閱隊(duì)列及Handler處理cfg.ReceiveEndpoint("CreateOrder_execute", ep =>{ ep.ExecuteActivityHost<createorderactivity, createordermodel="">(context);}); });});services.AddMassTransitHostedService();
服務(wù)編排
構(gòu)建Routing Slip,此處依據(jù)用例的需求,對(duì)需要協(xié)作的服務(wù)編排,組合一系列的Activity。
Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<createordercommand> request){ builder.AddActivity("ReduceStock", new Uri("..."), new {}); builder.AddActivity("DeductBalance", new Uri("..."), new {}); builder.AddActivity("CreateOrder", new Uri("..."), new { }); return Task.CompletedTask;}
執(zhí)行請(qǐng)求
當(dāng)請(qǐng)求進(jìn)入后,通過(guò)RequestClient發(fā)送CreateOrderCommand,同步等待執(zhí)行結(jié)果,再由編排器類負(fù)責(zé)協(xié)調(diào)預(yù)設(shè)好的Activity,發(fā)送事件到消息隊(duì)列,經(jīng)各Activity訂閱處理最終返回結(jié)果。
[Route("[controller]")]public class OrderController : ControllerBase{ private readonly IRequestClient<createordercommand> _createOrderClient; public OrderController(IRequestClient<createordercommand> createOrderClient) {_createOrderClient = createOrderClient; } [HttpGet("CreateOrder")] public async Task<commoncommandresponse<createorderresult>> CreateOrder() {var result = await _createOrderClient.GetResponse<commoncommandresponse<createorderresult>>(new CreateOrderCommand(){ // ...});return result.Message; }}
各服務(wù)中對(duì)于Activity設(shè)置偵聽(tīng)隊(duì)列以及請(qǐng)求信息,調(diào)用Execute執(zhí)行邏輯,當(dāng)出現(xiàn)異常時(shí)返回到MQ通知編排器類,在對(duì)之前執(zhí)行的Activity執(zhí)行Compensate。如在CreateOrderActivity中執(zhí)行異常,由編排器類執(zhí)行補(bǔ)償,ReduceStockActivity調(diào)用Compensate,執(zhí)行增加庫(kù)存邏輯
public class ReduceStockActivity : IActivity<ReduceStockModel, ReduceStockLog>{ public async Task<ExecutionResult> Execute(ExecuteContext<ReduceStockModel> context) {var argument = context.Arguments;// 扣減庫(kù)存await Task.Delay(100);return context.Completed(new ReduceStockLog() { ProductId = argument.ProductId, Amount = 1 }); } public async Task<CompensationResult> Compensate(CompensateContext<ReduceStockLog> context) {// 增加庫(kù)存await Task.Delay(100);return context.Compensated(); }}
執(zhí)行成功
用例請(qǐng)求執(zhí)行后,先由Controller發(fā)送請(qǐng)求,再由庫(kù)存服務(wù)扣減庫(kù)存,支付服務(wù)扣減余額,最后由訂單服務(wù)創(chuàng)建訂單,當(dāng)創(chuàng)建失敗時(shí),執(zhí)行補(bǔ)償操作,庫(kù)存服務(wù)增加庫(kù)存,支付服務(wù)增加余額。
執(zhí)行補(bǔ)償
用例請(qǐng)求執(zhí)行后,先由Controller發(fā)送請(qǐng)求,再由庫(kù)存服務(wù)扣減庫(kù)存,支付服務(wù)扣減余額,最后由訂單服務(wù)創(chuàng)建訂單,當(dāng)創(chuàng)建失敗時(shí),執(zhí)行補(bǔ)償操作,庫(kù)存服務(wù)增加庫(kù)存,支付服務(wù)增加余額。
在整個(gè)事務(wù)失敗后,先會(huì)返回異常,再由編排器執(zhí)行補(bǔ)償操作,實(shí)現(xiàn)最終的數(shù)據(jù)一致性。MassTransit也提供了重試機(jī)制以實(shí)現(xiàn)向前恢復(fù),避免因數(shù)據(jù)庫(kù)連接超時(shí)、網(wǎng)絡(luò)波動(dòng)等問(wèn)題造成的失敗。
參考文獻(xiàn)
到此這篇關(guān)于AspNetCore&MassTransit Courier實(shí)現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)AspNetCore分布式事務(wù)內(nèi)容請(qǐng)搜索以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持!
