消息队列学习笔记
说下消息队列
消息队列本质上是一个以队列的方式进行通信的转发器组件,运行流程简答来说,就是生产者发送消息到队列中进行存储,然后消费者从队列中接收消息进行处理
常见的消息队列中间件有 Kafka、RocketMQ 等
消息队列主要应用在
- 系统之间需要解耦的场景,可以将原来通过网络的方式进行调用改为使用消息队列进行消息的异步通讯,如果其中一个系统崩溃了,并不会影响到其他系统,只是消息会积压在MQ中没有消费者进行消费;
- 还有需要进行异步操作的场景,比如用户创建一个订单后,需要通知商户、更新库存、创建历史订单记录等非同步操作,如果用户需要等待后面这些操作的完成才能继续下一步的操作的话,用户体验会比较糟糕,那么我们就可以把后面这些操作都放入消息队列中,用户在创建完订单后即可继续下一步操作,后面的操作则由其他系统去从消息队列中接收并消费,这样就可以大大提高了用户使用系统的效率;
- 然后也可以应用于削峰的场景,当短期内出现大量的访问流量超过系统负载时,系统就容易崩溃,那么,我们可以加一个消息队列中间件,先将请求放入到消息队列,然后系统以最大的处理能力去消费这些请求,这样就可以保证系统的稳定
如何解决重复消费的问题
我们可以在生产端在接收到消费成功后,进行唯一标识,在每次要发送消失时进行判断消息是否已被标识已完成,以此来保证幂等
如何解决消息丢失的问题
首先在生产者发送消息阶段,生产者是消息来源,所以不会发生丢失,之后再发送消息后,如果没有接收到消息中间件的 ack 确认响应的话,则会重新发送消息,直至接收到 ack 响应
然后在消息存储阶段中,可以通过集群的方式确保数据不丢失,集群上的每个节点都会存储一个消息的副本,即便其中一个节点挂了,也不影响后续消费者接收消息
最后在消费者需要在接收消息并完成处理后,才回复 ack 响应,那么在消费阶段这个消息就不会丢失,因为消息已经处理完毕了,如果是在接收到消息后就立刻返回 ack 响应的话,若在处理消息过程中出错了,就没法再重新接收到原消息了,这个消息也就被丢失了,因为消息队列接收到响应后就把消息从队列中移除了
如何保证消息的可靠性和顺序性
保证消息的可靠性可以通过
- 消息的持久化,将消息持久化到磁盘,在系统崩溃、重启或网络故障下,确保未处理的消息不会被丢失,在服务重启后,消息仍然可以被重新读取和处理;
- 也可以通过建立消息确认机制,在消费者成功处理消息后返回确认的响应,生产者只有在接收到确认后,才将消息从队列中移除,否则会在一定时间内重新发送给其他消费者或同一个消费者;
- 再而可以通过消息重试策略,在消费者处理消息失败时,可以进行每隔一段时间后进行重试,如果多次重试仍然失败,则将消息发送到死信队列中,以便后续问题排除或者特殊处理
要保证消息的顺序性,
- 首先需要识别有序消息处理的场景,明确业务中哪些消息需要保证顺序的
- 其次消息队列需要支持顺序性,确保消费者可以按照特定的顺序进行处理
- 最后消费者的处理策略需要避免并发处理可能导致的顺序被打乱的情况,可以通过单线程或线程池来对顺序消息进行串行处理,确保消息是按照正确的顺序被消费
如何处理消息积压问题
首先先排查消费者端是否是因为bug导致的,
- 如果不是,那么我们可以优化下消费逻辑,比如将原来一条一条消费改为批量消费,如果仍然存在积压问题,则可以考虑水平扩容,增加消费组机器数量
- 如果是,那么需要先修复bug,恢复其正常消费速度,然后停掉原来的消费者,接着是临时部署一个消费架构去消费积压的消息,在这个消费架构中 queue 是原来的 10 倍,征用 10 倍的机器来部署消费者,等待所有积压的消息消费完后就撤下这个架构
本文由作者按照 CC BY 4.0 进行授权