• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

Spring Boot 和RocketMQ 继承实战记录

其他 开心洋葱 1380次浏览 0个评论

RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。具有以下特性:

支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
支持拉(pull)和推(push)两种消息模式
单一队列百万消息的堆积能力
支持多种消息协议,如 JMS、MQTT 等
分布式高可用的部署架构,满足至少一次消息传递语义
提供 docker 镜像用于隔离测试和云集群部署
提供配置、指标和监控等功能丰富的 Dashboard
对于这些特性描述,大家简单过一眼就即可,深入学习之后自然就明白了。

先上代码:
1、单机部署RocketMQ服务器
2、创建Spring Boot 应用
3、application.properties配置

# 消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
# 生产者的组名
apache.rocketmq.producer.producerGroup=Producer
# NameServer地址
apache.rocketmq.namesrvAddr=192.168.56.101:9876

4、创建消费者Consumer

package com.example.rocketmqdemo;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.List;

/**
 * 描述: 消息消费者
 *
 * @author yanpenglei
 * @create 2018-02-01 18:11
 **/
@Component
public class Consumer {

    /**
     * 消费者的组名
     */
    @Value("${apache.rocketmq.consumer.PushConsumer}")
    private String consumerGroup;

    /**
     * NameServer地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQPushConsumer() {

        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        try {
            //订阅PushTopic下Tag为push的消息
            consumer.subscribe("PushTopic", "push");

            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            //如果非第一次启动,那么按照上次消费的位置继续消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt messageExt : list) {

                            System.out.println("messageExt: " + messageExt);//输出消息内容

                            String messageBody = new String(messageExt.getBody(), "utf-8");

                            System.out.println("消费响应:Msg: " + messageExt.getMsgId() + ",msgBody: " + messageBody);//输出消息内容

                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                }


            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

5、创建生产者Producer

package com.example.rocketmqdemo;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 描述: 消息生产者
 *
 * @author yanpenglei
 * @create 2018-02-01 18:09
 **/
@Component
public class Producer {

    /**
     * 生产者的组名
     */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
     * NameServer 地址
     */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    @PostConstruct
    public void defaultMQProducer() {

        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);

        try {

            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            producer.start();

            for (int i = 0; i < 100; i++) {

                String messageBody = "我是消息内容:" + i;

                String message = new String(messageBody.getBytes(), "utf-8");

                //构建消息
                Message msg = new Message("PushTopic" /* PushTopic */, "push"/* Tag  */, "key_" + i /* Keys */, message.getBytes());

                //发送消息
                SendResult result = producer.send(msg);

                System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());

            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }

    }
}

6、配置POM依赖

 <!-- RocketMq客户端相关依赖 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>

7、运行后会显示

....
发送响应:MsgId:C0A80BB34AD400B4AAC231AE016C0000,发送状态:SEND_OK
发送响应:MsgId:C0A80BB34AD400B4AAC231AE01C40001,发送状态:SEND_OK
发送响应:MsgId:C0A80BB34AD400B4AAC231AE01C80002,发送状态:SEND_OK
发送响应:MsgId:C0A80BB34AD400B4AAC231AE01CE0003,发送状态:SEND_OK
发送响应:MsgId:C0A80BB34AD400B4AAC231AE01D10004,发送状态:SEND_OK
发送响应:MsgId:C0A80BB34AD400B4AAC231AE01D50005,发送状态:SEND_OK
....
messageExt: MessageExt [queueId=1, storeSize=195, queueOffset=4, sysFlag=0, bornTimestamp=1525937487357, bornHost=/192.168.56.1:54068, storeTimestamp=1525937485933, storeHost=/192.168.56.101:10911, msgId=C0A8386500002A9F000000000002CACE, commitLogOffset=182990, bodyCRC=1023148443, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25, KEYS=key_16, CONSUME_START_TIME=1525937505444, UNIQ_KEY=C0A80BB34AD400B4AAC231AE01FD0010, WAIT=true, TAGS=push}, body=21]]
消费响应:Msg: C0A80BB34AD400B4AAC231AE01FD0010,msgBody: 我是消息内容:16
messageExt: MessageExt [queueId=2, storeSize=193, queueOffset=0, sysFlag=0, bornTimestamp=1525937487300, bornHost=/192.168.56.1:54068, storeTimestamp=1525937485876, storeHost=/192.168.56.101:10911, msgId=C0A8386500002A9F000000000002BF73, commitLogOffset=180083, bodyCRC=1888434740, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=PushTopic, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=25, KEYS=key_1, CONSUME_START_TIME=1525937505445, UNIQ_KEY=C0A80BB34AD400B4AAC231AE01C40001, WAIT=true, TAGS=push}, body=20]]
消费响应:Msg: C0A80BB34AD400B4AAC231AE01C40001,msgBody: 我是消息内容:1
...
消费响应:Msg: C0A80BB34AD400B4AAC231AE0220001A,msgBody: 我是消息内容:26
消费响应:Msg: C0A80BB34AD400B4AAC231AE02F7005B,msgBody: 我是消息内容:91
消费响应:Msg: C0A80BB34AD400B4AAC231AE02420022,msgBody: 我是消息内容:34

参考:
RocketMQ 架构深入浅谈
RocketMQ 多机集群部署模式思路
RocketMQ 单主部署

专业术语

Producer
消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。

Producer Group
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。在这里可以不用关心,只要知道有这么一个概念即可。

Consumer
消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。

Consumer Group
消费者组,和生产者类似,消费同一类消息的多个 consumer 实例组成一个消费者组。

Topic
Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。

Message
Message 是消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。

Tag
标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

Broker
Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。

Name Server
Name Server 为 producer 和 consumer 提供路由信息。


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明Spring Boot 和RocketMQ 继承实战记录
喜欢 (0)

您必须 登录 才能发表评论!

加载中……