本文共 5491 字,大约阅读时间需要 18 分钟。
我们整合boot项目的时候都是引入 xxx-start 依赖,但是现在大多数的整合RocketMQ都还不是这样。
我花了一天时间使用rocketmq-spring-boot-starter整合,使得操作起来更加简单。
4.0.0 www.xdx97.com RocketMQDemo 1.0-SNAPSHOT org.springframework.boot spring-boot 2.2.0.RELEASE org.springframework.boot spring-boot-starter-web 2.2.2.RELEASE org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.2
import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;/** * 生产者 */@RestControllerpublic class ProducerController { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 同步消息 * * @author 小道仙 * @date 2020年3月3日 */ @GetMapping("/syncNews") public String syncNews(){ Message message = new Message(); message.setBody("同步消息".getBytes()); SendResult sendResult = rocketMQTemplate.syncSend("sync-topic", message); // 同步消息发送成功会有一个返回值,我们可以用这个返回值进行判断和获取一些信息 System.out.println(sendResult); return "success"; } /** * 异步消息 * * @author 小道仙 * @date 2020年3月3日 */ @GetMapping("/asynNews") public String asynNews(){ Message message = new Message(); message.setBody("异步消息".getBytes()); rocketMQTemplate.asyncSend("asyn-topic", message, new SendCallback() { public void onSuccess(SendResult sendResult) { // 成功回调 } public void onException(Throwable throwable) { // 失败回调 } }); return "success"; } /** * 单向消息 * * @author 小道仙 * @date 2020年3月3日 */ @GetMapping("/sendOneWay") public String sendOneWay(){ Message message = new Message(); message.setBody("单向消息".getBytes()); rocketMQTemplate.sendOneWay("oneWay-topic",message); return "success"; }}
import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 异步消息 */@Service@RocketMQMessageListener(consumerGroup = "my-consumer_asyn-topic", topic = "asyn-topic")public class AsynConsumer implements RocketMQListener{ public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); System.out.println(new String(body)); }}
import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 单向消息 */@Service@RocketMQMessageListener(consumerGroup = "my-consumer_oneWay-topic", topic = "oneWay-topic")public class OneWayConsumer implements RocketMQListener{ public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); System.out.println(new String(body)); }}
import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * 同步消费 */@Service@RocketMQMessageListener(consumerGroup = "my-consumer_sync-topic", topic = "sync-topic")public class SyncConsumer implements RocketMQListener{ public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); System.out.println(new String(body)); }}
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class RocketMQApp { public static void main(String[] args) { SpringApplication.run(RocketMQApp.class, args); }}
server: port: 80rocketmq: name-server: xxxxxxx:9876 producer: send-message-timeout: 300000 group: rocketmq-group
ps:我们会看到打印的message信息,你可以使用JSON去格式化获取自己想要的信息。消息有短暂的延迟。
转载地址:http://twjj.baihongyu.com/