SpringBoot整合RabbitMQ实现消息延迟队列

环境依赖

SpringBoot 3.1.0

JDK 17

前期准备

安装MQ:  liunx+docker+rabbitmq安装延迟队列插件

实例

实现延迟队列的一种方式是在 RabbitMQ 中使用消息延迟插件,这个插件可以让你在消息发送时设置一个延迟时间,超过这个时间后消息才会被消费者接收到。下面是 SpringBoot 整合 RabbitMQ 实现延迟队列的简单步骤:

1.添加 RabbitMQ 的 Maven 依赖

<dependency>
    <groupId>org.springframework.bootgroupId>
    <artifactId>spring-boot-starter-amqpartifactId>
dependency>

2.配置 RabbitMQ

在 application.properties 配置文件中添加 RabbitMQ 的连接信息:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtual-host=/
# 手动应答
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次从队列中取一个,轮询分发,默认是公平分发
spring.rabbitmq.listener.simple.prefetch=1
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5

3.配置文件

@Configuration
public class RabbitMQOrderConfig {

    /**
     * 订单交换机
     */
    public static final String ORDER_EXCHANGE = "order_exchange";
    /**
     * 订单队列
     */
    public static final String ORDER_QUEUE = "order_queue";
    /**
     * 订单路由key
     */
    public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";

    /**
     * 死信交换机
     */
    public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
    /**
     * 死信队列 routingKey
     */
    public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";

    /**
     * 死信队列
     */
    public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";

    /**
     * 延迟时间 (单位:ms(毫秒))
     */
    public  static final Integer DELAY_TIME = 10000;


    /**
     * 创建死信交换机
     */
    @Bean("orderDeadLetterExchange")
    public Exchange orderDeadLetterExchange() {
        return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
    }

    /**
     * 创建死信队列
     */
    @Bean("orderDeadLetterQueue")
    public Queue orderDeadLetterQueue() {
        return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
    }

    /**
     * 绑定死信交换机和死信队列
     */
    @Bean("orderDeadLetterBinding")
    public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
    }


    /**
     * 创建订单交换机
     */
    @Bean("orderExchange")
    public Exchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE, true, false);
    }

    /**
     * 创建订单队列
     */
    @Bean("orderQueue")
    public Queue orderQueue() {
        Map args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);

        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);

        //过期时间,单位毫秒
        args.put("x-message-ttl", DELAY_TIME);

        return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
    }

    /**
     * 绑定订单交换机和队列
     */
    @Bean("orderBinding")
    public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
    }
}

4.定义消息实体类

定义一个消息体类,用来存储需要发送的消息:

@Slf4j
@Data
@Builder
public class OrderMessage implements Serializable {

    /**
     * 商户订单号
     */
    private String orderId;

    /**
     * 支付宝订单号
     */
    private String tradeNo;
}

5.定义消息发送者

定义一个 RabbitMQ 消息发送者类,用来发送消息到 RabbitMQ:

@Slf4j
@Component
public class MessageSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrderMessage(OrderMessage message) {
        //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
        rabbitTemplate.setMandatory(true);
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
        rabbitTemplate.setReturnsCallback(returned -> {
            int code = returned.getReplyCode();
            System.out.println("code=" + code);
            System.out.println("returned=" + returned);
        });
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", message);

        log.info("===============延时队列生产消息====================");
        log.info("发送时间:{},发送内容:{}, {}ms后执行", LocalDateTime.now(), message, RabbitMQConfig.DELAY_TIME);
    }
}

6.定义消息消费者

定义一个 RabbitMQ 消息消费者类,用来接收并处理消息:

@Component
@Slf4j
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {

    @RabbitHandler
    public void consumer(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
        log.info("收到消息:{}",new Date());
        log.info("msgTag:{}", message.getMessageProperties().getDeliveryTag());
        log.info("message:{}", message);
        log.info("content:{}", orderMessage);
    }
}

这里使用了 @RabbitListener 注解来将一个方法标记为一个 RabbitMQ 消息监听器,通过设置 queues 属性来指定监听的队列名称。

7.定义一个controller

@Slf4j
@Api(tags = "延迟消息接口")
@RestController
@RequestMapping("/rabbitmq_order_delay_message")
public class RabbitMQDelayMessageController {

    @Autowired
    private MessageSender sender;

    /**
     * 发送消息
     * @return
     */
    @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)
    @ResponseBody
    public void sendMsg() {
        OrderMessage orderMessage = OrderMessage.builder().orderId(UUID.randomUUID().toString()).tradeNo(UUID.randomUUID().toString()).build();
        sender.sendOrderMessage(orderMessage);
    }
}

启动项目,请求运行结果:

总的xml:

    org.springframework.boot
    spring-boot-starter



    com.xiaoleilu
    hutool-all
    3.0.7



    io.swagger
    swagger-annotations
    ${swagger-annotations.version}



    org.springframework.boot
    spring-boot-starter-test
    test



    org.springframework.boot
    spring-boot-starter-amqp


    org.projectlombok
    lombok


    org.springframework.boot
    spring-boot-starter-web



    com.alibaba
    fastjson
    1.2.73
    compile

问题总结

1.Invalid argument, ‘x-delayed-type’ must be an existing exchange type

需要创建一个交换机

2.Connection refused: no further information

请检查配置  application.xml配置的rabbimq不生效,可以将配置放到application.properties

3.Channel shutdown: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED – unknown delivery tag 1, class-id=60, method-id=80)

这种情况:

1.消费者内部重复签收导致签收异常

解决方案:增加配置手动处理应答

 1) 配置新增

spring.rabbitmq.listener.simple.acknowledge-mode=manual #手动签收

  2) 代码里: 增加channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

    public void consumer(String body, Message message, Channel channel) throws IOException {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("收到消息:" + new Date());
            System.out.println("msgTag=" + msgTag);
            System.out.println("message=" + message);
            System.out.println("body=" + body);
            channel.basicAck(msgTag, false);
        }catch (Exception e) {
            log.error("【订单延迟关闭处理异常】 接收到消息为:" + msgTag + " ,消息异常消费 : ", e);
        } finally {
            // 处理完之后手动签收(这里再次签收)
            channel.basicAck(msgTag, false);
        }
    }

2.已经是自动处理了,然后代码里还有手动处理channel.basicAck(msgTag, false)

解决方案:去除channel.basicAck(msgTag, false)

4.Failed to convert message

消息发送和接收的方式不对 比如发送的是对象,则接收的也必须是对象,发送的是string ,接收的也必须是string

阅读全文
下载说明:
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.shuli.cc/?p=14757,转载请注明出处。
0

评论0

显示验证码
没有账号?注册  忘记密码?