亲宝软件园·资讯

展开

ActiveMQ介绍与使用

苹果大大个 人气:0

一、什么是消息中间件

消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送

二、什么是ActiveMQ

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

三、什么时候需要用ActiveMQ

ActiveMQ常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。例如以我在工作中的使用,在比较耗时且异步的远程开锁操作时

四、如何使用ActiveMQ

1.AcitveMQ的数据传送流程

2.ActiveMQ的两种消息传递类型

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。

(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据,与MQTT协议的实现是类似的,对MQTT协议有兴趣的可跳转文末查看

两种消息传递类型的不同,点对点传输消费者可以接收到在连接之前生产者所推送的数据,而基于发布/订阅模式的传输方式消费者只能接收到连接之后生产者推送的数据。

3.ActiveMQ的安装与启动

(1)官网下载对应服务器版本

(2)解压后进入apache-activemq-5.15.9/bin目录

(3)执行./activemq start启动ActiveMQ

(4)浏览器输入ActiveMQ启动的服务器ip:8161便可进入web界面,点击Manage ActiveMQ broker可以查看消息推送的状态,默认账号密码为admin,admin

(5)启动错误分析

进入/root/apache-activemq-5.15.9/data目录查看activemq.log文件,根据错误提示信息修改,例如端口号被占用等。

4.ActiveMQ的代码测试

(1)构建maven项目,引入依赖

<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.9.0</version>
        </dependency>

(2)生产者类

/**
 * @Description 生产者
 * @Date 2019/7/20
 * @Created by yqh
 */
public class MyProducer {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createQueue("myQueue");
        // 创建一个生产者
        MessageProducer producer = session.createProducer(destination);
        // 向队列推送10个文本消息数据
        for (int i = 1 ; i <= 10 ; i++){
            // 创建文本消息
            TextMessage message = session.createTextMessage("第" + i + "个文本消息");
            //发送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已发送的消息:" + message.getText());
        }
        //关闭连接
        connection.close();
    }

}

运行结果:

已发送的消息:第1个文本消息

已发送的消息:第2个文本消息

已发送的消息:第3个文本消息

已发送的消息:第4个文本消息

已发送的消息:第5个文本消息

已发送的消息:第6个文本消息

已发送的消息:第7个文本消息

已发送的消息:第8个文本消息

已发送的消息:第9个文本消息

已发送的消息:第10个文本消息

测试查看web后台显示,有10条消息在队列中等待消费

(3)消费者类

/**
 * @Description 消费者类
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyConsumer {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createQueue("myQueue");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        // 创建消费的监听
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

测试结果:

消费的消息:第1个文本消息

消费的消息:第2个文本消息

消费的消息:第3个文本消息

消费的消息:第4个文本消息

消费的消息:第5个文本消息

消费的消息:第6个文本消息

消费的消息:第7个文本消息

消费的消息:第8个文本消息

消费的消息:第9个文本消息

消费的消息:第10个文本消息

web后台显示有一个消费者处于连接状态,且已消费了10个message,而该条队列已没有message待消费了

(4)当我们运行两个消费者类,消息又是怎么被消费的呢?是两个消费者都能收到生产者生产的message,还是只有其中一个消费者能消费呢?

我们先运行两个消费者,在运行一个生产者对目标队列生产10个message,会发现有以下情况

// Consumer1控制台

消费的消息:第1个文本消息

消费的消息:第3个文本消息

消费的消息:第5个文本消息

消费的消息:第7个文本消息

消费的消息:第9个文本消息

// Consumer2控制台

消费的消息:第2个文本消息

消费的消息:第4个文本消息

消费的消息:第6个文本消息

消费的消息:第8个文本消息

消费的消息:第10个文本消息

即队列中的数据会平均的分给每一个消费者消费,且每一条数据只能被消费一次

(5)以上是基于队列点对点的传输类型,以下是基于发布/订阅模式传输的类型测试

/**
 * @Description 基于发布/订阅模式传输类型的生产者测试
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyProducerForTopic {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createTopic("topicTest");
        // 创建一个生产者
        MessageProducer producer = session.createProducer(destination);
        // 向队列推送10个文本消息数据
        for (int i = 1 ; i <= 10 ; i++){
            // 创建文本消息
            TextMessage message = session.createTextMessage("第" + i + "个文本消息");
            //发送消息
            producer.send(message);
            //在本地打印消息
            System.out.println("已发送的消息:" + message.getText());
        }
        //关闭连接
        connection.close();
    }

}
/**
 * @Description 基于发布/订阅模式传输类型的消费者测试
 * @Date 2019/7/20 0020
 * @Created by yqh
 */
public class MyConsumerForTopic {

    private static final String ACTIVEMQ_URL = "tcp://192.168.168.242:61616";

    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        // 打开连接
        connection.start();
        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列目标,并标识队列名称,消费者根据队列名称接收数据
        Destination destination = session.createTopic("topicTest");
        // 创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        // 创建消费的监听
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

现在如果我们先启动生产者,再启动消费者,会发现消费者是无法接收到之前生产者之前所生产的数据,只有消费者先启动,再让生产者消费才可以正常接收数据,这也是发布/订阅的主题模式与点对点的队列模式的一个明显区别。

而如果启动两个消费者,那么每一个消费者都能完整的接收到生产者生产的数据,即每一条数据都被消费了两次,这是发布/订阅的主题模式与点对点的队列模式的另一个明显区别。

浅谈MQTT

1、什么是MQTT

MQTT的全称是“ Message Queuing Telemetry Transport”,即消息队列遥测传输,是一种基于订阅/发布模式的应用层协议,而http是一种基于restful风格的一种应用层协议。

MQTT协议是一种轻量级协议,作为一种低开销、低带宽占用的即时通讯协议,常被应用于物联网项目。同样基于订阅/发布模式的中间件有ActiveMQ,Kafka等消息中间件,归根结底实现的都是消息的传输。

2、如何理解MQTT

MQTT的是一种应用层协议,每一种协议都有其适用场景,而MQTT常被应用于消息推送,消息采集。例如温度检测仪器定时上传温度、检测矿洞氧气浓度等。

MQTT是基于TCP/IP的一种应用层协议,TCP/IP本身已实现了在不可靠的网络环境提供可靠的网络传输的功能,而MQTT协议也有其保障消息可靠传输的策略。

MQTT推送的消息有三种消息质量

1.至多一次,即消息只推送一次,至于消息有没有推送成功

2.至少一次,需要确认消息到达,可能会导致收到重复数据(注:MQTT定义的重发机制与tcp的重复机制是不同的,tcp的重复机制是在限定时间内如果没有收到对应序号的响应报文,则会重新推送该序列号对应的报文,而MQTT的重发机制是在客户端重新建立连接时,

补发之前没有对应响应报文的数据包,当然客户端可以选择是否要接收这些之前没有传输成功的数据包。最开始使用netty实现MQTT服务器的时候就理解错了,以为MQTT的重复机制与tcp的重复机制一样)

3.只有一次,确认消息只到达一次,常用于对数据要求严格的场景,例如计费场景,订单场景

3、如何使用MQTT

MQTT的客户端和服务端目前已有成熟的开源产品,例如服务端有emqx,客户端有Eclipse Paho Mqtt(Java),都可以方面的引入相应的库快速的实现推送功能(具体可根据需求查看对应的API)。

本质上来将是客户端与服务端建立一个Socket,然后根据MQTT协议规定发送响应的报,例如建立socket后发送connet报文去建立连接,然后服务器会解析该连接报文,并保存该连接的相关信息。

我们可以把MQTT协议的规定当成是我们实现web项目中所实现的业务逻辑。

4、MQTT协议的相关的名词解析

1.订阅(Subscription)

订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

2.、会话(Session)

每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

3.主题名(Topic Name)

连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

4.主题筛选器(Topic Filter)

一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

5.负载(Payload)

消息订阅者所具体接收的内容。

总结

加载全部内容

相关教程
猜你喜欢
用户评论