软件编程
位置:首页>> 软件编程>> java编程>> 图文并茂讲解RocketMQ消息类别

图文并茂讲解RocketMQ消息类别

作者:一个双子座的Java攻城狮  发布时间:2023-06-11 07:59:41 

标签:RocketMQ,消息类别,消息类型

1、同步消息

即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

图文并茂讲解RocketMQ消息类别

生产者:

public class Producer {
   public static void main(String[] args) throws Exception{
       DefaultMQProducer producer=new DefaultMQProducer("group1");
       producer.setNamesrvAddr("192.168.23.127:9876");
       producer.start();
       for (int i = 1; i <= 5; i++) {
           Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
           //同步消息发送
           SendResult result = producer.send(msg);
           System.out.println("返回结果:"+result);
       }
       producer.shutdown();
   }
}

消费者:

public class Consumer {
   public static void main(String[] args) throws Exception{
       DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
       consumer.setNamesrvAddr("192.168.23.127:9876");
       consumer.subscribe("topic2","*");
       consumer.registerMessageListener(new MessageListenerConcurrently() {
           @Override
           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
               for (MessageExt msg : list) {
                   //System.out.println("收到消息:"+msg);
                   System.out.println("消息:"+new String(msg.getBody()));
               }
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
           }
       });
       consumer.start();// 开启多线程 监控消息,持续运行
       System.out.println("接收消息服务已运行");
   }
}

测试:

图文并茂讲解RocketMQ消息类别

2、异步消息

即时性较弱,但需要有回执的消息,例如订单中的某些信息

图文并茂讲解RocketMQ消息类别

生产者:

public class Producer {
   public static void main(String[] args) throws Exception{
       DefaultMQProducer producer=new DefaultMQProducer("group1");
       producer.setNamesrvAddr("192.168.23.127:9876");
       producer.start();
       for (int i = 1; i <= 5; i++) {
           //异步消息发送
           Message msg = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));
           producer.send(msg, new SendCallback() {
               //表示成功返回结果
               @Override
               public void onSuccess(SendResult sendResult) {
                   System.out.println(sendResult);
               }
               //表示发送消息失败
               @Override
               public void onException(Throwable throwable) {
                   System.out.println(throwable);
               }
           });
       }
       //添加一个休眠操作,确保异步消息返回后能够输出
       TimeUnit.SECONDS.sleep(10);
       producer.shutdown();
   }
}

消费者:

public class Consumer {
   public static void main(String[] args) throws Exception{
       DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
       consumer.setNamesrvAddr("192.168.23.127:9876");
       consumer.subscribe("topic2","*");
       consumer.registerMessageListener(new MessageListenerConcurrently() {
           @Override
           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
               for (MessageExt msg : list) {
                   //System.out.println("收到消息:"+msg);
                   System.out.println("消息:"+new String(msg.getBody()));
               }
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
           }
       });
       consumer.start();// 开启多线程 监控消息,持续运行
       System.out.println("接收消息服务已运行");
   }
}

测试:

图文并茂讲解RocketMQ消息类别

3、单向消息

不需要有回执的消息,例如日志类消息

图文并茂讲解RocketMQ消息类别

生产者:

public class Producer {
   public static void main(String[] args) throws Exception{
       DefaultMQProducer producer=new DefaultMQProducer("group1");
       producer.setNamesrvAddr("192.168.23.127:9876");
       producer.start();
       for (int i = 1; i <= 5; i++) {
           //单向消息
           Message msg = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8"));
           producer.sendOneway(msg);
       }
       //添加一个休眠操作,确保异步消息返回后能够输出
       TimeUnit.SECONDS.sleep(10);
       producer.shutdown();
   }
}

消费者代码同上

测试:

图文并茂讲解RocketMQ消息类别

总结 同步消息

SendResult result = producer.send(msg);

异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)

producer.send(msg, new SendCallback() {
               //表示成功返回结果
               @Override
               public void onSuccess(SendResult sendResult) {
                   System.out.println(sendResult);
               }
               //表示发送消息失败
               @Override
               public void onException(Throwable throwable) {
                   System.out.println(throwable);
               }
           });

单向消息

producer.sendOneway(msg);

来源:https://blog.csdn.net/weixin_64061088/article/details/128406813

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com