博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot整合RocketMQ,三种测试附带源码【rocketmq-spring-boot-starter】
阅读量:157 次
发布时间:2019-02-28

本文共 5491 字,大约阅读时间需要 18 分钟。

我们整合boot项目的时候都是引入 xxx-start 依赖,但是现在大多数的整合RocketMQ都还不是这样。

我花了一天时间使用rocketmq-spring-boot-starter整合,使得操作起来更加简单。

1、说明

1-1:rocketmq-spring-boot-starter 提供了一个 rocketMQTemplate 使得发消息更加简单,它底层也还是基于DefaultMQProducer 但是封装之后使用更加方便了。

1-2:rocketmq-spring-boot-starter使用监听器的方式获取消息更加的简单

1-3:下面代码提供三种基础的消息发送测试【同步、异步、单向】你只需要修改yml文件里面的RocketMQ地址就好了

1-4:更多内容可以查看官网文档

2、代码

在这里插入图片描述

2-1:pom

4.0.0
www.xdx97.com
RocketMQDemo
1.0-SNAPSHOT
org.springframework.boot
spring-boot
2.2.0.RELEASE
org.springframework.boot
spring-boot-starter-web
2.2.2.RELEASE
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.2
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.2

2-1:ProducerController

import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/** * 生产者 */@RestControllerpublic class ProducerController {
@Autowired private RocketMQTemplate rocketMQTemplate; /** * 同步消息 * * @author 小道仙 * @date 2020年3月3日 */ @GetMapping("/syncNews") public String syncNews(){
Message message = new Message(); message.setBody("同步消息".getBytes()); SendResult sendResult = rocketMQTemplate.syncSend("sync-topic", message); // 同步消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息 System.out.println(sendResult); return "success"; } /** * 异步消息 * * @author 小道仙 * @date 2020年3月3日 */ @GetMapping("/asynNews") public String asynNews(){
Message message = new Message(); message.setBody("异步消息".getBytes()); rocketMQTemplate.asyncSend("asyn-topic", message, new SendCallback() {
public void onSuccess(SendResult sendResult) {
// 成功回调 } public void onException(Throwable throwable) {
// 失败回调 } }); return "success"; } /** * 单向消息 * * @author 小道仙 * @date 2020年3月3日 */ @GetMapping("/sendOneWay") public String sendOneWay(){
Message message = new Message(); message.setBody("单向消息".getBytes()); rocketMQTemplate.sendOneWay("oneWay-topic",message); return "success"; }}

2-1:AsynConsumer

import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 异步消息 */@Service@RocketMQMessageListener(consumerGroup = "my-consumer_asyn-topic", topic = "asyn-topic")public class AsynConsumer implements RocketMQListener
{
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody(); System.out.println(new String(body)); }}

2-1:OneWayConsumer

import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 单向消息 */@Service@RocketMQMessageListener(consumerGroup = "my-consumer_oneWay-topic", topic = "oneWay-topic")public class OneWayConsumer implements RocketMQListener
{
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody(); System.out.println(new String(body)); }}

2-1:SyncConsumer

import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 同步消费 */@Service@RocketMQMessageListener(consumerGroup = "my-consumer_sync-topic", topic = "sync-topic")public class SyncConsumer implements RocketMQListener
{
public void onMessage(MessageExt messageExt) {
byte[] body = messageExt.getBody(); System.out.println(new String(body)); }}

2-2:RocketMQApp

import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class RocketMQApp {
public static void main(String[] args) {
SpringApplication.run(RocketMQApp.class, args); }}

2-2:application

server:  port: 80rocketmq:  name-server: xxxxxxx:9876  producer:    send-message-timeout: 300000    group: rocketmq-group

3、测试

3-1:http://localhost/syncNews

3-2:http://localhost/asynNews

3-3:http://localhost/sendOneWay

ps:我们会看到打印的message信息,你可以使用JSON去格式化获取自己想要的信息。消息有短暂的延迟。

4、如果需要源码可以关注微信公众号回复: RocketMQDemo

在这里插入图片描述

转载地址:http://twjj.baihongyu.com/

你可能感兴趣的文章