本文共 2550 字,大约阅读时间需要 8 分钟。
在分布式系统架构中,消息队列的核心职责是为不同的应用系统提供异步通信服务,通常涉及以下三个重要角色:
• 消息发布者,发送消息的应用系统,负责创建消息对象并通过网络发布到消息Broker,发布的过程一般是同步的。 • 消息Broker,异步消息的“代理人”,负责接收并持久化消息,保证将消息投递到指定的消息订阅者应用系统。 • 消息订阅者,订阅消息的应用系统,负责消费消息Broker投递过来的消息。 异步消息队列如果需求是 “变更Case”,要实现的基本业务逻辑如下:
“变更Case” 消息队列设计方案存在一个严重问题: “变更” 关联的数据库变更事务提交成功后,如果 “发布账单变更消息” 发送失败(例如网络异常或者消息队列服务不可用),则会导致 “记录用户行为” 和 “短信通知用户” 后续动作失败,无法完成风险控制数据积累,用户也无法及时获取到账户变更信息。
为了解决这个严重问题,初步考虑先发布 “变更” 消息,再做数据库变更的设计方案。但还是无法解决 “消息发布” 和 “数据库事务” 可能不一致性的严重问题,如果消息已发布成功过了,数据库变更事务回滚了,就会导致用户的账单没有变更,但用户却收到了账户变更短信,存在一致性漏洞的 “变更Case” 消息队列设计方案如下所示事务型消息是否被投递与发送端系统本地数据库事务保持一致,如果本地数据库事务提交则消息会被投递给订阅端;如果本地数据库事务回滚,则直接丢弃消息不投递给订阅端系统。
事务型消息回查——两阶段消息流程
事务型消息流程的第一阶段是消息发布端发送消息到可靠消息组件,第二阶段是消息发布端发送提交或者回滚指令到可靠消息组件,可靠消息组件根据此指令决定是否投递消息到订阅端系统。当第二阶段指令出现异常时,可靠消息组件在一定时间后主动回查消息发送端系统,确认对应的事务型消息是否投递。 按照 “事务型消息设计方案E” 的时序图,消息发布者和消息队列之间增加了一个 “二阶段” 消息,用来标明对应事务型消息的 “事务状态”,消息队列根据 “二阶段” 消息决定是否投递消息到下游消息订阅者。应用 “事务型消息”,“账单变更Case” 的可行解决方案如下所示: 至于依据数据库事务提交/回滚状态决定事务型 “二阶段” 消息的发送,可以通过Spring Framework提供的事务模板同步器自动感知消息发布者本地事务状态,相关接口是: 按照 “账单变更Case” 消息队列-事务型消息设计方案 ,可以满足“账单服务数据库变更”与“异步消息是否投递到订阅者应用”的事务一致性需求。结合Spring Framework的事务模板工具类伪代码如下:transactionTemplate.execute(new TransactionCallback() { @Override public Object doInTransaction(TransactionStatus status) { try { messageQueueSDK.publishTransactionMessage(message); dbService.doUpdateOperation(); } catch (Exception e) { status.setRollbackOnly(); } return null; }});
至此,消息队列 “事务型消息” 的设计方案和实现原理基本阐明清楚了,还遗留两个可以深究的关键点:
• 推模式push:由消息中间件主动发消息给消费者
• 拉模式pull:消费者主动从消息中间件拉取消息 • 比较:采用push模式,可以尽可能快的把消息发给消费者,但是如果消费者处理一条消息能力较弱(处理时间长),消息中间件会不断的发消息给消费者,到时消费者的缓存区溢出;采用pull模式,可能会增加消息的延迟。 • 消息订阅者收到的消息不保证有序,即收到消息的顺序与发布者发送消息的顺序可能会不一致 • 消息投递策略为至少一次,即对于同一条消息消息订阅者可能收到多次,要求订阅者保证幂等特性转载地址:http://sbrsi.baihongyu.com/