欢迎光临企业官网建站网站,提供自助建设系统平台服务

企业官网建站

专业为公司品牌推广建设网站

java B2B2C 源码 Springcloud多租户电子商城系统- Stream重新入队(RabbitMQ)

作者:jcmp      发布时间:2021-04-25      浏览量:0
@EnableBinding(TestA

@EnableBinding(TestApplication.TestTopic.class)@SpringBootApplicationpublic class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生产接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).build()); return "ok"; } } /** * 消息消费逻辑 */ @Slf4j @Component static class TestListener { private int count = 1; @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info("Received payload : " + payload + ", " + count); throw new RuntimeException("Message consumer failed!"); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); }}

内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,比如:

spring.cloud.stream.bindings.example-topic-input.destination=test-topicspring.cloud.stream.bindings.example-topic-input.group=stream-exception-handlerspring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=truespring.cloud.stream.bindings.example-topic-output.destination=test-topic。

完成了上面配置之后,启动应用并访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了,此时可以看到程序不断的抛出了消息消费异常。这是由于这里我们多加了一个配置:spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true。在该配置作用之下,消息消费失败之后,并不会将该消息抛弃,而是将消息重新放入队列,所以消息的消费逻辑会被重复执行,直到这条消息消费成功为止。 java B2B2C 源码 Springcloud多租户电子商城系统。