# 消息中间件“重复消费”、“顺序消费”问题
# 顺序消费问题
# rocketmq
- 问题引入
- 全局顺序消息:由于多队列写,多队列读,无法保证消息顺序
- 部分顺序消息:例如电商流程中,只需要保证订单生成、付费、发货部分顺序
- 解决方式
- 全局顺序消息:读、写队列全部设置成1,但这样会放弃高吞吐的优势(全局顺序场景其实不多,大多是部分顺序场景)
- 部分顺序消息:
- 发送端:使用MessageQueueSelector类来控制把消息发往哪个Message Queue
- 消费端:使用MessageListenerOrderly类来解决单Message Queue的消息被并发处理的问题
# kafka
- 问题引入
- kafka只能保证一个partition分区中的消息顺序,如果消费者从多个分区读取数据,则无法保证消费顺序
- 解决方式
- 单分区:通过kafka设置,将topic分区设置成1,同时也放弃了高吞吐
- 多分区:
- 消费端和生产端通过其他中间件保障(如:redis等),生产端维护发送消息顺序列表,消费端获取列表在消费消息时进行校验
- 消息体设计+消费端封装,生产端保障消息发送顺序(可以通过加锁等方式),消费端监听到消息后组装成任务队列进行多线程处理
如果对高吞吐要求不高,单分区方式即可;对于有高吞吐需求场景,需要在生产、消费端进行处理,结合业务实际
# 重复消费问题
# rocketmq
- 问题引入
- 对分布式消息队列来说,同时做到确保一定投递和不重复投递是很难的,也就是所谓的“有且仅有一次”。在鱼和熊掌不可兼得的情况下,RocketMQ选择了确保一定投递,保证消息不丢失,但有可能造成消息重复。
- 消息重复一般情况下不会发生,但是如果消息量大,网络有波动,消息重复就是个大概率事件。比如Producer有个函数setRetryTimesWhenSendFailed,设置在同步方式下自动重试的次数,默认值是2,这样当第一次发送消息时,Broker端接收到了消息但是没有正确返回发送成功的状态,就造成了消息重复。
- 解决方式
- 第一种方法是保证消费逻辑的幂等性(多次调用和一次调用效果相同)
- 另一种方法是维护一个已消费消息的记录,消费前查询这个消息是否被消费过
# kafka
为了避免重复消费消息,客户端需要主动跟踪Partition的偏移量,无论该偏移量最终持久化至客户端提供的外部存储介质上还是Kafka内部。
- 问题引入
- [1]Consumer 在消费过程中,应用进程被强制kill掉或发生异常退出
- 例如在一次poll500条消息后,消费到200条时,进程被强制kill消费导致offset 未提交,或出现异常退出导致消费到offset未提交。下次重启时,依然会重新拉取这500消息,这样就造成之前消费到200条消息重复消费了两次。
- [2]消费者消费时间过长
- max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。若消费者消费的消息比较耗时,那么这种情况可能就会出现。
- [1]Consumer 在消费过程中,应用进程被强制kill掉或发生异常退出
- 解决方式
- 对于[1]引入的问题:
- 在有消费者线程的应用中,应尽量避免使用kill -9这样强制杀进程的命令。对于服务可以通过 ShutdownHook 实现服务平滑停止,“Runtime.getRuntime().addShutdownHook(...)”
- 对于[2]引入的问题:
- 方式一
- 提高单条消息的消费能力
- 将 max.poll.interval.ms(offset自动提交间隔) 设置大一些,避免不必要的rebalance
- 将 max.poll.records(单次消费者拉取的最大数据条数) 设置小一些,默认值500
以上这些调整只能说尽可能避免,无法做到100%
- 方式二
- 引入单独去重机制,例如生成消息时,在消息中加入唯一标识符如消息id等。在消费端,我们可以保存最近的1000条消息id到redis或mysql表中,配置max.poll.records的值小于1000。在消费消息时先通过前置表去重后再进行消息的处理。
- 方式一
- 对于[1]引入的问题:
← 主流消息中间件对比 rocketmq 集群 →