CAP与BASE
我们都知道,传统数据库事务具有ACID的特性,但在分布式环境下,追求强一致性在大多数情况下无法满足高性能需求。
分布式系统的CAP理论告诉我们,一致性、可用性、分区容忍性无法同时满足,最多只能满足其他两项。CAP理论描述如下:
- 一致性(Consistency):所有节点在同一时间读到同样的数据;
- 可用性(Availability):无论是成功还是失败,每个请求都能收到一个反馈。可用性强调的是服务可用,不保证数据的正确性;
- 分区容忍性(Partition-Tolerance):即使系统中有部分问题或者有消息的丢失,但系统仍然能够继续运行。分区容忍性强调的是集群对分区故障的容错能力;
对于分布式系统而言,分区故障无法避免,因而分区容忍性在一个分布式系统中是必须要考虑的,这也意味着,设计一个分布式系统,我们只能在CAP中的C、A中作出选择,而BASE就是在C和A的选择中作出的一种权衡。
BASE可以说是AP系统的一种延伸,其描述如下:
- 基本可用(Basically Available):基本可用,允许分区失败;
- 软状态(Soft state):软状态,一种中间状态,接受一段时间的状态不同步;
- 最终一致(Eventually consistent):最终一致,系统在短暂时间内可能是不一致的,但能够保证最终的数据是一致的;
BASE通过牺牲了强一致性来获取高可用性,允许数据存在短暂的不一致,到这里其实我们可以发现,ACID强调的是数据一致性,而BASE强调的则是服务可用;
基于可靠消息的最终一致性
基于可靠消息实现的分布式事务遵循了BASE理论,它通过引入消息中间件,在基于可靠消息的前提下,协商多个节点异步完成整个分布式事务,如果某一步骤出现失败时,则进行一定次数的重试,必要情况下需要人工介入处理。
以下单为例,用户下单后,会给用户增加一定的积分,在这个流程中涉及了订单服务以及积分服务,如下图所示:
但这个流程其实存在3个问题:
- 如果在订单服务的订单创建事务(1.2)执行成功了,在发送订单创建消息(1.3)后,由于网络不可达等因素,订单服务无法收到来自消息中间件的响应时,订单服务的本地事务是应该继续提交还是回滚呢;
- 如果订单创建消息发送成功,得到了消息中间件的正确响应,但订单服务的本地事务却提交失败了,但消息却已经投递出去了,这种情况又应该怎么处理呢;
- 订单创建消息已经成功投递到下游应用(积分服务),但积分服务的本地事务却执行失败了,又应该怎么处理呢
以上第1和第2个问题,其实可以归结于同一个问题,即如何保证消息发送的一致性,而第3个问题,即是如何确保消息一定能够消息成功;
如何保证消息发送的一致性
我们可以采用两阶段提交的方式,但并不是所有的消息中间件都支持XA,况且,出现问题的概率不大,为此引入了两阶段提交,性能方面就需要有所妥协,这样的方案实现是否有点得不偿失?
这个问题其实RocketMQ有了实现方案,RocketMQ支持事务消息,其通过引入了“半消息”的概念(半消息对下游系统不可见),保证了本地事务执行与消息发送的一致性。其发送的正向流程如下:
- 业务系统发送“半消息”;
- 消息中间件将“半消息”进行持久化;
- 业务系统得到消息发送结果,成功的话则执行本地事务,失败则结束流程;
- 本地事务执行,并将业务操作结果发送给消息中间件;
- 消息中间件根据业务系统的事务执行结果,选择将消息投放或者删除;
但实际上仅凭以上这个流程其实还是没有办法保证消息发送的一致性的,比如
(1) 在业务系统接收消息中间件对“半消息”的处理结果时,由于网络、或者业务系统和消息中间件自身故障时,业务系统无法得知消息中间件的处理结果时,就会按照消息发送失败来处理,这个时候,就有可能存在,’本地事务未执行,但“半消息”却发送成功的不一致情况’;
(2) 本地事务执行后,将业务操作结果返回到消息中间件时,如果出现问题,那么消息中间件将不知道如何处理已经存在的“半消息”;
为应对以上两种不一致情况,RocketMQ在消息中间件一方引入了“事务回查”的反向流程,其执行流程如下:
- 对于超过一定时长未处理的“半消息”,消息中间件将会回调业务系统询问本地事务的处理结果;
- 业务系统在检查事务操作结果,将结果返回给消息中间件;
- 消息中间件根据业务系统的事务执行结果,选择将消息投放或者删除;
发送消息的正向流程和反向流程结合起来,就是解决消息发送一致性的整个方案。
以上面的下单流程为例,在引入了RocketMQ事务消息后,基本流程可以表示为:
如何保证消息一定会被消费
确保消息一定能够被消息的流程比较简单,我们只需要保证消息的持久化,并引入消息确认机制,只有在消息被明确消费完成后,将确认消息返回到消息中间时,消息才能够被丢弃,否则则进行一定次数的重试,需要注意的是,引入重试操作后,消费方的操作需要保证幂等性。
RocketMQ整合实例
该例子基于Spring Boot构建:
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
|
applition.yml配置
1 2 3 4 5 6 7 8 9
| rocketmq: name-server: localhost:9876 producer: group: default-group sendMessageTimeout: 5000
demo: rocketmq: orderTopic: order-paid-topic
|
构建订单服务
发送半消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Service @Slf4j public class OrderServiceImpl implements OrderService {
@Value("${demo.rocketmq.orderTopic}") private String orderTopic;
@Autowired private RocketMQTemplate rocketMQTemplate;
@Override public void addOrder() { int orderId = new Random().nextInt(3) + 1; Message message = MessageBuilder.withPayload(orderId).build(); log.info("发送半消息, orderId = {}", orderId); rocketMQTemplate.sendMessageInTransaction(orderTopic, message, null); }
}
|
本地事务执行回调与事务状态回查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @RocketMQTransactionListener @Slf4j public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { //执行本地事务 Integer orderId = Integer.valueOf(new String((byte[])msg.getPayload())); int status = new Random().nextInt(3); if (status == 0) { log.info("提交事务消息, orderId = {}", orderId); return RocketMQLocalTransactionState.COMMIT; }
if (status == 1) { log.info("回滚事务消息, orderId = {}", orderId); return RocketMQLocalTransactionState.ROLLBACK; }
log.info("事务消息中间状态, MQ需要回查事务状态"); return RocketMQLocalTransactionState.UNKNOWN; }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { Integer orderId = Integer.valueOf(new String((byte[])msg.getPayload())); RocketMQLocalTransactionState retState; int status = new Random().nextInt(3); switch (status) { case 0: retState = RocketMQLocalTransactionState.UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState.COMMIT; break; case 2: default: retState = RocketMQLocalTransactionState.ROLLBACK; break; } log.info("回查事务状态, orderId = {}, status = {}, retState = {}", orderId, status, retState); return retState; }
}
|
构建积分服务
1 2 3 4 5 6 7 8 9 10 11
| @Service @RocketMQMessageListener(topic = "${demo.rocketmq.orderTopic}", consumerGroup = "order_paid_consumer_default_group") @Slf4j public class OrderPaidConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { Integer orderId = Integer.valueOf(new String((byte[])message.getBody())); log.info("orderId = {}, 增加积分", orderId);
} }
|
参考:
《大型网站系统与Java中间件实践》