国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

AspNetCore&MassTransit Courier實(shí)現(xiàn)分布式事務(wù)的詳細(xì)過(guò)程

瀏覽:285日期:2022-06-09 11:41:00
目錄
  • 分布式事務(wù)
  • Saga模式
    • 執(zhí)行過(guò)程
    • 恢復(fù)策略
    • 協(xié)作方式
      • 編排式(Orchestrator)
      • 協(xié)同式(Choreography)
  • MassTransit Courier
    • 補(bǔ)償服務(wù)
      • 服務(wù)建立
        • 服務(wù)配置
          • 服務(wù)編排
            • 執(zhí)行請(qǐng)求
              • 執(zhí)行成功
              • 執(zhí)行補(bǔ)償
          • 參考文獻(xiàn)

            在之前的一篇博文中,CAP框架可以方便我們實(shí)現(xiàn)非實(shí)時(shí)、異步場(chǎng)景下的最終一致性,而有些用例總是無(wú)法避免的需要在實(shí)時(shí)、同步場(chǎng)景下進(jìn)行,可以借助Saga事務(wù)來(lái)解決這一困擾。在一些博文和倉(cāng)庫(kù)中也搜尋到了.Net下實(shí)現(xiàn)Saga模式的解決方案MassTransit,這就省得自己再造輪子了。

            分布式事務(wù)

            分布式系統(tǒng)中,分布式事務(wù)是一個(gè)不能避免的問(wèn)題,如何保證不同節(jié)點(diǎn)間的數(shù)據(jù)一致性。舉個(gè)常見(jiàn)的例子,下訂單、減庫(kù)存、扣余額,三者在單個(gè)節(jié)點(diǎn)時(shí),可以借助本地事務(wù),實(shí)現(xiàn)要么成功要么失敗。而當(dāng)三者處于不同節(jié)點(diǎn)時(shí),又參雜了如網(wǎng)絡(luò)環(huán)境、節(jié)點(diǎn)自身環(huán)境、服務(wù)環(huán)境等各種因素,使得三個(gè)節(jié)點(diǎn)想要實(shí)現(xiàn)要么成功、要么失敗就增加了許多困難。

            CAP理論和BASE理論很好的詮釋了這一問(wèn)題,也有了許多的解決分布式事務(wù)的方案,如2PC、3PC、TCC、本地消息表、Saga等一系列解決方案,面對(duì)不同場(chǎng)景、不同要求等可選擇不同的解決方案。

            數(shù)據(jù)一致性容錯(cuò)性復(fù)雜性性能維護(hù)成本2PC強(qiáng)低中低低3PC強(qiáng)低高低中TCC弱高高中高本地消息表弱高低中中MQ事務(wù)弱高低高中Saga事務(wù)弱高高中高

            在之前提到過(guò)一個(gè)基于本地消息表的CAP框架,借助最終一致性很方便的解決了異步非實(shí)時(shí)請(qǐng)求下的分布式事務(wù),而對(duì)于大部分場(chǎng)景雖然可以直接或者妥協(xié)方式使用著異步非實(shí)時(shí),如同步實(shí)時(shí)場(chǎng)景的下訂單且減庫(kù)存變更到異步非實(shí)時(shí)場(chǎng)景的下訂單后發(fā)事件減庫(kù)存,但是總有那么一些場(chǎng)景,不得不去考慮同步實(shí)時(shí)請(qǐng)求下的分布式事務(wù)。

            Saga模式

            Saga模式又叫做長(zhǎng)時(shí)間運(yùn)行事務(wù)(Long-running-transaction), 由普林斯頓大學(xué)的 Hector Garcia-Molina和Kenneth Salem 1987年發(fā)表的論文《Sagas》。核心思想是將長(zhǎng)事務(wù)拆分為多個(gè)本地短事務(wù),通過(guò)保證所有短事務(wù)的成功或失敗來(lái)決定整體的成功或失敗,由Saga事務(wù)協(xié)調(diào)器協(xié)調(diào)管理,所有節(jié)點(diǎn)執(zhí)行成功,則成功,如有節(jié)點(diǎn)失敗,則反向執(zhí)行前置節(jié)點(diǎn)的補(bǔ)償操作。

            • 每個(gè)Saga事務(wù)由一系列冪等的有序子事務(wù)(sub-transaction) Ti 組成。
            • 每個(gè)Ti 都有對(duì)應(yīng)的冪等補(bǔ)償動(dòng)作Ci,補(bǔ)償動(dòng)作用于撤銷Ti造成的結(jié)果。

            執(zhí)行過(guò)程

            當(dāng)正常執(zhí)行時(shí),依照T1、T2、T3三個(gè)短事務(wù)正常執(zhí)行下去,直到最后一個(gè)Tn事務(wù)執(zhí)行完畢,宣告整個(gè)事務(wù)的成功。

            而當(dāng)執(zhí)行到某個(gè)Tj出現(xiàn)故障時(shí),則反向補(bǔ)償之前的Tj-1..T1,每個(gè)對(duì)應(yīng)的補(bǔ)償操作Cj-1...C1,其中Tj事務(wù)由于在執(zhí)行階段就已失敗,所以Tj對(duì)應(yīng)的補(bǔ)償動(dòng)作Cj不需要執(zhí)行,即也確定了最后一個(gè)Tn事務(wù)可以不設(shè)置補(bǔ)償動(dòng)作Cn。

            恢復(fù)策略

            • 向前恢復(fù)(forward recovery):對(duì)于Ti事務(wù)的執(zhí)行,部分場(chǎng)景下可能因?yàn)閿?shù)據(jù)庫(kù)的連接、網(wǎng)絡(luò)的波動(dòng)等導(dǎo)致短暫的失敗,對(duì)Ti事務(wù)重試執(zhí)行,以確保整個(gè)事務(wù)的執(zhí)行,如執(zhí)行T1, T2, T3,當(dāng)執(zhí)行T3失敗時(shí),不直接宣告失敗,對(duì)T3執(zhí)行重試以排除部分不穩(wěn)定因素,如在若干次重試無(wú)效后,再考慮向后恢復(fù)。

            • 向后恢復(fù)(backward recovery):按照?qǐng)?zhí)行順序方式作為向前的指向,則向后為反向補(bǔ)償,對(duì)已執(zhí)行過(guò)的節(jié)點(diǎn)順序倒退執(zhí)行各Ti的補(bǔ)償動(dòng)作Ci,也就是把走過(guò)的路往回走,對(duì)執(zhí)行過(guò)的操作執(zhí)行業(yè)務(wù)上的反操作,如正向流程執(zhí)行減庫(kù)存則補(bǔ)償操作時(shí)執(zhí)行加庫(kù)存。

            協(xié)作方式

            對(duì)于服務(wù)與服務(wù)間的協(xié)作,我們通常有兩種模式:Orchestration(編排式) 和 Choreography(協(xié)同式),在Saga模式中也有著這兩種的實(shí)現(xiàn)。

            • 編排式(Orchestrator):把 Saga 的決策和執(zhí)行順序邏輯集中在一個(gè) Saga 編排器類中。Saga 編排器發(fā)出命令式消息給各個(gè) Saga 參與方,指示這些參與方服務(wù)完成具體操作(本地事務(wù))。
            • 協(xié)同式(Choreography):把 Saga 的決策和執(zhí)行順序邏輯分布在 Saga 的每個(gè)參與方中,它們通過(guò)交換事件的方式來(lái)進(jìn)行溝通。

            編排式與協(xié)同式的差異僅在于服務(wù)之間的協(xié)作方式,每個(gè)參與服務(wù)的接口定義卻沒(méi)有任何區(qū)別。

            編排式(Orchestrator)

            編排式的 Saga 需要開(kāi)發(fā)人員定義一個(gè)編排器類,用于編排一個(gè)Saga中多個(gè)參與服務(wù)執(zhí)行的流程。如果整個(gè)業(yè)務(wù)流程正常結(jié)束,業(yè)務(wù)就成功完成,一旦這個(gè)過(guò)程的任何環(huán)節(jié)出現(xiàn)失敗,Saga編排器類就會(huì)以相反的順序調(diào)用補(bǔ)償操作,重新進(jìn)行業(yè)務(wù)回滾。

            對(duì)于每個(gè)參與的服務(wù)而言,需要做的事情是

            • 訂閱并處理命令消息
            • 執(zhí)行命令后返回響應(yīng)消息
            • 設(shè)計(jì)執(zhí)行邏輯和補(bǔ)償邏輯

            以提交訂單為例,假設(shè)場(chǎng)景是分布式系統(tǒng)下,進(jìn)程間以消息傳遞進(jìn)行通信:

            1、事務(wù)發(fā)起方的主業(yè)務(wù)邏輯請(qǐng)求預(yù)先定義好的Saga編排器類(內(nèi)部編排了執(zhí)行順序)。

            2、Saga編排器類向MQ發(fā)送減庫(kù)存事件,庫(kù)存服務(wù)訂閱事件、執(zhí)行處理并返回MQ處理結(jié)果。

            3、Saga編排器類向MQ發(fā)送減余額事件,支付服務(wù)訂閱事件、執(zhí)行處理并返回MQ處理結(jié)果。

            4、Saga編排器類向MQ發(fā)送創(chuàng)建訂單命令,訂單服務(wù)訂閱事件并按照命令創(chuàng)建訂單。

            5、主業(yè)務(wù)邏輯接收并處理Saga編排器類處理結(jié)果。

            6、整個(gè)過(guò)程由Saga 編排器類對(duì)接收到的回復(fù)進(jìn)行判決,來(lái)決定是繼續(xù)執(zhí)行還是懸崖勒馬。

            協(xié)同式(Choreography)

            沒(méi)有集中式的編排類,而是各參與方間相互訂閱,一個(gè)服務(wù)訂閱另一個(gè)服務(wù)的事件。

            先由事務(wù)發(fā)起方執(zhí)行邏輯并發(fā)布一個(gè)事件,該事件被一個(gè)或多個(gè)服務(wù)進(jìn)行訂閱,這些服務(wù)執(zhí)行本地?cái)?shù)據(jù)庫(kù)操作并發(fā)布(或不發(fā)布)新的事件,該部分需要保證本地?cái)?shù)據(jù)庫(kù)的操作成功且寫(xiě)入MQ的消息也成功,可考慮使用本地消息表或是基于MQ事務(wù)。當(dāng)最后一個(gè)服務(wù)執(zhí)行本地事務(wù)并且不發(fā)布任何事件或者發(fā)布的事件沒(méi)有被任何Saga參與者訂閱意味著事務(wù)結(jié)束,則整個(gè)業(yè)務(wù)流程的分布式事務(wù)完成。如果某一服務(wù)出現(xiàn)故障,那么則反向發(fā)布事件,執(zhí)行補(bǔ)償操作,以此回滾。

            以提交訂單為例,假設(shè)場(chǎng)景是分布式系統(tǒng)下,進(jìn)程間以消息傳遞進(jìn)行通信:

            1、事務(wù)發(fā)起方執(zhí)行主業(yè)務(wù)邏輯發(fā)送提交訂單命令。

            2、庫(kù)存服務(wù)訂閱事件、扣減庫(kù)存并發(fā)布已扣減事件。

            3、訂單服務(wù)訂閱庫(kù)存已扣減事件,創(chuàng)建訂單并發(fā)布訂單已創(chuàng)建事件。

            4、支付服務(wù)訂閱訂單已創(chuàng)建事件,執(zhí)行支付并發(fā)布訂單已支付事件。

            5、主業(yè)務(wù)邏輯訂閱訂單已支付事件并處理。

            當(dāng)某服務(wù)內(nèi)執(zhí)行時(shí)如存在異常,則反向發(fā)布事件,如訂單創(chuàng)建失敗,則發(fā)布OrderCreatedFailed事件,庫(kù)存服務(wù)訂閱該事件并執(zhí)行補(bǔ)償操作。

            相比而言,編排式中參與服務(wù)無(wú)需向協(xié)同式中訂閱上游服務(wù)的事件,減少了服務(wù)間對(duì)事件協(xié)議的依賴,而只需要關(guān)心集權(quán)的編排器類發(fā)送的消息。

            MassTransit Courier

            補(bǔ)償服務(wù)

            當(dāng)開(kāi)啟一個(gè)事務(wù)前,需要做一些準(zhǔn)備,準(zhǔn)備一個(gè)事務(wù)Id,記錄整個(gè)事務(wù)執(zhí)行情況,各Tj事務(wù)執(zhí)行情況,當(dāng)前請(qǐng)求上下文參數(shù),入?yún)?shù)記錄等,以方便執(zhí)行補(bǔ)償操作時(shí)需要用到。如當(dāng)Tj事務(wù)執(zhí)行失敗時(shí),需要對(duì)Cj-1到C1執(zhí)行補(bǔ)償操作,此時(shí)各補(bǔ)償操作需要一些正向執(zhí)行T1,Tj-1的請(qǐng)求參數(shù)或執(zhí)行結(jié)果,因此都需要記錄下來(lái)。

            在Courier中,通過(guò)Routing Slip來(lái)完成這些記錄,創(chuàng)建一個(gè)Guid,記錄請(qǐng)求上下文參數(shù)信息,可以綁定幾個(gè)內(nèi)置事件,在各階段到來(lái)時(shí)會(huì)發(fā)送事件,如有需要可以訂閱。

            var builder = new RoutingSlipBuilder(NewId.NextGuid());builder.AddSubscription(context.ReceiveContext.InputAddress, RoutingSlipEvents.Completed | RoutingSlipEvents.Faulted | RoutingSlipEvents.CompensationFailed);builder.AddVariable("RequestId", context.RequestId);builder.AddVariable("ResponseAddress", context.ResponseAddress);builder.AddVariable("FaultAddress", context.FaultAddress);builder.AddVariable("Request", context.Message);//組合一系列Activityvar routingSlip = builder.Build();await context.Execute(routingSlip).ConfigureAwait(false);

            服務(wù)建立

            弄了個(gè)Demo,建立了三個(gè)服務(wù),此處我使用編排式來(lái)完成,但無(wú)論是選用編排式還是協(xié)同式,都借助RabbitMQ實(shí)現(xiàn)消息傳遞。

            每個(gè)服務(wù)都安裝了MassTransit相關(guān)的包

            MassTransit.AspNetCoreMassTransit.RabbitMQ

            將Saga編排器類放置在OrderService中了,對(duì)于編排器類的放置,個(gè)人認(rèn)為是應(yīng)該看用例的主服務(wù)是誰(shuí)而放置,想過(guò)放在BFF去協(xié)調(diào)三個(gè)服務(wù),但是總是感覺(jué)不是BFF的職責(zé)范圍。

            服務(wù)配置

            在各服務(wù)中對(duì)MassTransit配置,如下在OrderService中對(duì)MassTransit需要使用到的RabbitMQ配置,對(duì)需要進(jìn)行多個(gè)服務(wù)協(xié)作的用例配置Routing Slip,對(duì)消息隊(duì)列偵聽(tīng)訂閱需要的事件并配置相應(yīng)的Activity處理。

            services.AddMassTransit(x =>{    var currentAssembly = Assembly.GetExecutingAssembly();    x.AddActivities(currentAssembly);    x.AddConsumers(currentAssembly);    x.AddRequestClient<createordercommand>();    x.UsingRabbitMq((context, cfg) =>    {// 配置RabbitMQcfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>{    h.Username(Configuration["RabbitmqConfig:Username"]);    h.Password(Configuration["RabbitmqConfig:Password"]);});//配置Routing Slipcfg.ReceiveEndpoint("CreateOrderCommand", ep =>{    ep.ConfigureConsumer<createorderrequestproxy>(context);    ep.ConfigureConsumer<createorderresponseproxy>(context);});// 配置訂閱隊(duì)列及Handler處理cfg.ReceiveEndpoint("CreateOrder_execute", ep =>{    ep.ExecuteActivityHost<createorderactivity, createordermodel="">(context);});    });});services.AddMassTransitHostedService();

            服務(wù)編排

            構(gòu)建Routing Slip,此處依據(jù)用例的需求,對(duì)需要協(xié)作的服務(wù)編排,組合一系列的Activity。

            Task BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<createordercommand> request){    builder.AddActivity("ReduceStock", new Uri("..."), new {});    builder.AddActivity("DeductBalance", new Uri("..."), new {});    builder.AddActivity("CreateOrder", new Uri("..."), new { });    return Task.CompletedTask;}

            執(zhí)行請(qǐng)求

            當(dāng)請(qǐng)求進(jìn)入后,通過(guò)RequestClient發(fā)送CreateOrderCommand,同步等待執(zhí)行結(jié)果,再由編排器類負(fù)責(zé)協(xié)調(diào)預(yù)設(shè)好的Activity,發(fā)送事件到消息隊(duì)列,經(jīng)各Activity訂閱處理最終返回結(jié)果。

            [Route("[controller]")]public class OrderController : ControllerBase{    private readonly IRequestClient<createordercommand> _createOrderClient;    public OrderController(IRequestClient<createordercommand> createOrderClient)    {_createOrderClient = createOrderClient;    }    [HttpGet("CreateOrder")]    public async Task<commoncommandresponse<createorderresult>> CreateOrder()    {var result = await _createOrderClient.GetResponse<commoncommandresponse<createorderresult>>(new CreateOrderCommand(){    // ...});return result.Message;    }}

            各服務(wù)中對(duì)于Activity設(shè)置偵聽(tīng)隊(duì)列以及請(qǐng)求信息,調(diào)用Execute執(zhí)行邏輯,當(dāng)出現(xiàn)異常時(shí)返回到MQ通知編排器類,在對(duì)之前執(zhí)行的Activity執(zhí)行Compensate。如在CreateOrderActivity中執(zhí)行異常,由編排器類執(zhí)行補(bǔ)償,ReduceStockActivity調(diào)用Compensate,執(zhí)行增加庫(kù)存邏輯

            public class ReduceStockActivity : IActivity<ReduceStockModel, ReduceStockLog>{    public async Task<ExecutionResult> Execute(ExecuteContext<ReduceStockModel> context)    {var argument = context.Arguments;// 扣減庫(kù)存await Task.Delay(100);return context.Completed(new ReduceStockLog() { ProductId = argument.ProductId, Amount = 1 });    }    public async Task<CompensationResult> Compensate(CompensateContext<ReduceStockLog> context)    {// 增加庫(kù)存await Task.Delay(100);return context.Compensated();    }}

            執(zhí)行成功

            用例請(qǐng)求執(zhí)行后,先由Controller發(fā)送請(qǐng)求,再由庫(kù)存服務(wù)扣減庫(kù)存,支付服務(wù)扣減余額,最后由訂單服務(wù)創(chuàng)建訂單,當(dāng)創(chuàng)建失敗時(shí),執(zhí)行補(bǔ)償操作,庫(kù)存服務(wù)增加庫(kù)存,支付服務(wù)增加余額。

            執(zhí)行補(bǔ)償

            用例請(qǐng)求執(zhí)行后,先由Controller發(fā)送請(qǐng)求,再由庫(kù)存服務(wù)扣減庫(kù)存,支付服務(wù)扣減余額,最后由訂單服務(wù)創(chuàng)建訂單,當(dāng)創(chuàng)建失敗時(shí),執(zhí)行補(bǔ)償操作,庫(kù)存服務(wù)增加庫(kù)存,支付服務(wù)增加余額。

            在整個(gè)事務(wù)失敗后,先會(huì)返回異常,再由編排器執(zhí)行補(bǔ)償操作,實(shí)現(xiàn)最終的數(shù)據(jù)一致性。MassTransit也提供了重試機(jī)制以實(shí)現(xiàn)向前恢復(fù),避免因數(shù)據(jù)庫(kù)連接超時(shí)、網(wǎng)絡(luò)波動(dòng)等問(wèn)題造成的失敗。

            參考文獻(xiàn)

            到此這篇關(guān)于AspNetCore&amp;MassTransit Courier實(shí)現(xiàn)分布式事務(wù)的文章就介紹到這了,更多相關(guān)AspNetCore分布式事務(wù)內(nèi)容請(qǐng)搜索以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持!

            標(biāo)簽: ASP.NET
            主站蜘蛛池模板: 国产午夜亚洲精品理论片不卡 | 欧美日韩精品一区二区在线线 | 最新国产精品亚洲 | 亚洲国产欧美目韩成人综合 | 一级毛片成人免费看免费不卡 | 午夜毛片免费观看视频 | 久久一本一区二区三区 | 精品一区二区三区免费观看 | 成年人黄色网址 | 99精品视频99| 亚洲1314| 国产免费一区二区在线看 | 老色99久久九九精品尤物 | a级片在线 | 国产一级二级三级视频 | 亚洲高清在线观看看片 | 国产乱理片在线观看夜 | 久久久影院亚洲精品 | 成人影院在线免费观看 | 亚洲高清视频在线播放 | 国产成年人网站 | 亚洲男同视频网站 | 国产成人咱精品视频免费网站 | 久久香蕉国产视频 | 久久综合日韩亚洲精品色 | 欧美高清亚洲欧美一区h | 毛片视频网站在线观看 | 一级做a爱片特黄在线观看免费看 | 精品视频在线观看一区二区三区 | 国产精品欧美亚洲日本综合 | 超清首页 国产 亚洲 丝袜 | 一级视频网站 | 精品国产v | 国产高清视频a在线大全 | 五月色一区二区亚洲小说 | 国产精品成人久久久 | 欧美一级毛片不卡免费观看 | 亚洲精品一区二区三区美女 | 国产综合精品一区二区 | 亚洲欧美激情精品一区二区 | 精品一区二区三区免费毛片爱 |