MQ消息积压以及处理方案
这种时候只能操作临时扩容,以更快的速度去消费数据了
- 增加多个消费者,加速消费
- 新建topic引流,将消息引导别的程序中,洪水一个道理。
如何保证消息不被丢失
生产者丢失消息
这里我们有两种方法可以避免RabbitMQ丢失消息
- 开启RabbitMQ的事务功能,就是生产者在发送数据之前开启事务,然后发送消
息,如果消息没有成功被rabbitmq接收到,那么生产者会受到异常报错,这时就可以回滚事务,然后尝试重新发送;如果收到了消息,那么就可以提交事务.但这种事务模式,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太
耗性能会造成吞吐量的下降。 - 可以开启confirm模式。在生产者哪里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了rabbitmq之中,rabbitmq会给你回传一个ack消息,告诉你这个消息发送OK了;如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发.
一般采用confirm模式
RabbitMQ自己丢失消息
- 设置消息持久化到磁盘。设置持久化有两个步骤:
- 创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里面的数据。
- 发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时rabbitmq就会将消息持久化到磁盘上。
必须要同时开启这两个才可以。
- 而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前rabbitmq挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
消费者丢失消息
使用rabbitmq提供的ack机制,首先关闭rabbitmq的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack
怎么保证MQ顺序性?
RabbitMQ
拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
Q.E.D.