`

ActiveMQ启动与实现

    博客分类:
  • MQ
 
阅读更多
ActiveMQ服务器启动与java简单实现发送与接收

1.下载ActiveMQ
去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ
windows:
解压缩apache-activemq-5.14.2-bin.zip,然后双击apache-activemq-5.14.2\bin\win64\activemq.bat运行ActiveMQ程序。


centos:
下载activemq:
wget http://archive.apache.org/dist/activemq/5.13.5/apache-activemq-5.13.5-bin.tar.gz  

启动activemq:
[root@centos64 *] cd  /home/hailong/activemq/apache-activemq-5.13.5/bin  
[root@centos64 bin]./activemq start 



启动ActiveMQ以后,登陆:http://localhost:8161/admin/,用户名密码在apache-activemq-5.14.2\conf\jetty-realm.properties文件里。

什么情况下使用ActiveMQ?
多个项目之间集成
(1) 跨平台 pts->wms
(2) 多语言
(3) 多项目
降低系统间模块的耦合度,解耦
(1) 软件扩展性
系统前后端隔离
(1) 前后端隔离,屏蔽高安全区
举例:电商中,添加商品库存,商品下单后到OMS推送

3.在ActiveMQ服务器创建一个Queue


4.ActiviteMQ消息有3中形式



(1)、点对点方式(point-to-point)
点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行
(2)、发布/订阅 方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

当采用点对点模型时,消息将发送到一个队列,该队列的消息只能被一个消费者消费。而采用发布订阅模型时,消息可以被多个消费者消费,且时间上的依赖,一个接收端只能接收他创建以后客户端发送的信息。也就是说在发布订阅模型中,生产者和消费者完全独立,不需要感知对方的存在。

5.ActiviteMQ接收和发送消息基本流程


发送消息的基本步骤:
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)、使用消息生产者MessageSender发送消息

消息接收者从JMS接受消息的步骤
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。

6.简单代码实现
发送方:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    private static final int SEND_NUMBER = 5;

    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        /* Connection :JMS 客户端到JMS Provider 的连接 */
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // MessageProducer:消息发送者
        MessageProducer producer;
        // TextMessage message;
        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.TRUE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination = session.createQueue("SecondQueue");
            // 得到消息生成者【发送者】
            producer = session.createProducer(destination);
            // 设置不持久化,此处学习,实际根据项目决定
            // PERSISTENT:保存到磁盘,consumer消费之后,message被删除。
            // NON_PERSISTENT:保存到内存,消费之后message被清除。
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 构造消息,此处写死,项目就是参数,或者方法获取
            sendMessage(session, producer);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }

    public static void sendMessage(Session session, MessageProducer producer)
            throws Exception {
        for (int i = 1; i <= SEND_NUMBER; i++) {
            TextMessage message = session
                    .createTextMessage("ActiveMq 发送的消息" + i);
            // 发送消息到目的地方
            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
            producer.send(message);
        }
    }
}


接收方:
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Destination;
import javax.jms.TextMessage;
import javax.jms.DeliveryMode;

public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            while (true) {
                //设置接收者接收消息的时间,为了便于测试,这里谁定为1s
                TextMessage message = (TextMessage) consumer.receive(10000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}


7.JMS的主要特性
1.面向Java平台的标准消息传递API
2.在Java或JVM语言比如Scala、Groovy中具有互用性
3.无需担心底层协议
4.有queues和topics两种消息传递模型
5.支持事务
6.能够定义消息格式(消息头、属性和内容)

JMS或者NMS都没有标准的底层协议。它们可以在任何底层协议上运行,但是API是与编程语言绑定的。AMQP(RabbitMQ是一个开源的AMQP实现)解决了这个问题,它使用了一套标准的底层协议,加入了许多其他特征来支持互用性,为现代应用丰富了消息传递需求。

  • 大小: 77.6 KB
  • 大小: 95.1 KB
  • 大小: 288.9 KB
  • 大小: 15.6 KB
  • 大小: 16.3 KB
分享到:
评论

相关推荐

    activemq+swing 实现的聊天工具(可执行文件和完整工程源码

    activemq+swing 实现的聊天工具(可执行文件和完整工程源码, activemq+swing 实现的聊天工具(可执行文件和完整工程源码) 可执行文件右键运行即可 执行Start.java可启动程序

    activemq, Apache ActiveMQ镜像.zip

    activemq, Apache ActiveMQ镜像 欢迎来到 Apache ActiveMQis是一个高性能的Apache 2.0许可以消息代理和 JMS 1.1实现。正在启动要帮助你入门,请尝试以下链接:入门http://activemq.apache.org/version-

    jms基础实例(内有ActiveMQ中间件)

    1启动ActiveMQ 运行C:\apache-activemq-5.2.0\bin\activemq.bat 2测试 ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动 netstat -an|find "61616" C:\Documents and ...

    activemq-5.12-test

    master_slave文件夹允许您以主动-被动模式启动两个ActiveMQ代理。 代理的完整配置可以修改,因为它们是共享目录: conf /主 conf /从属 数据和日志位于conf / data目录中。 数据持久性可以通过KahaDB文件数据库来...

    activemq的几种基本通信方式总结

    activemq是JMS消息通信规范的一个实现。总的来说,消息规范里面定义最常见的几种消息通信模式主要有发布-订阅、点对点这两种。另外,通过结合这些模式的具体应用,我们在处理某些应用场景的时候也衍生出来了一种请求...

    [ActiveMQ]消息中间件基本概念及安装

    本文来自于csdn,文章简单的介绍了ActiveMQ的概念,下载,安装,启动及优缺点。ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现,它...

    ActiveMQ——Broker

    说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java 代码中,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可靠性在conf里面拷贝一个activemp.xml一样的文件命名为 activemp...

    ActiveMq由浅入深讲解+面试题50道讲解

    内容包括MQ概述和工作流程,启动过程与启动异常分析,消息的基本模型,基于队列的生产者和消费者,基于发布-订阅的生产者和消费者,消息的同步消费和异步消费,消息的数据类型,身份认证,持久化配置与实现,...

    spring-jms:带有 ActiveMQ 的 spring-jms

    这是一个简单的Spring JMS template例子,通过ActiveMQ中转消息,实现了发送和接收消息的基本功能 测试方法: 1:下载 ActiveMQ,并启动 2: 分别运行MessageReciver和MessageSender

    基于Java的消息中间件java操作demo.zip

    消息中间件java操作demo 提供activeMq的 java实现,和两种模式:点对点、发布和订阅 直接clone下来,导入maven项目启动test包下的 Client.java

    JMS入门Demo

    3.启动ActiveMQ服务器: 32位机 : 直接运行\bin\win32\activemq.bat 64位机 : 直接运行\bin\win64\activemq.bat 4.打开ActiveMQ消息管理后台系统 http://localhost:8161/admin/ 参考文章为 : HelloWorld : ...

    SpringCloud+vue+uniapp智慧物联/物业/巡检/停车管理系统源码

    ActiveMq订阅消息队列,让订单更快流转。 二、项目应用多端 管理系统后端,小区管理系统前端,小区管理系统业主手机版、小区管理系统物业手机版,适用小程序,对接公众号。 三、项目系统构成 物业管理系统:管理后台...

    SpringBoot面试专题.pdf

    4.如何重新加载 Spring Boot 上的更改,而无需重新启动服务器? 5.Spring Boot 中的监视器是什么? 6.如何在 Spring Boot 中禁用 Actuator 端点安全性? 7.如何在自定义端口上运行 Spring Boot 应用程序? 8.什么是 ...

    spring boot 实践学习案例,与其它组件整合

    spring boot 实践学习案例,与其它组件结合如 mybatis、jpa、dubbo、redis、mongodb、memcached、kafka、rabbitmq、activemq、elasticsearch、security、shiro等 #### Spring Boot 版本 - 2.0.3.RELEASE #### 模块...

    Springboot_MQTT.zip

    springboot整合mqtt实现消息的订阅和发布,提供测试使用到的各种工具方便使用,如:activemq,mqttfx,下载启动文件中的springboot_mqtt项目代码结合工具测试就OK。

    springcloud入门

    springcloud-producer:服务提供者,包含具体的业务逻辑实现等。 springcloud-consumer:服务消费者,从eureka server中获取producer提供的服务。 springcloud-gateway:网关接口,暴露给调用方调用,包含负载均衡、...

    全新JAVAEE大神完美就业实战课程 超150G巨制课程轻松实战JAVAEE课程 就业部分.txt

    09-SSH企业案例_CRM-客户...6Nginx的安装与启动 7Nginx静态网站部署 8Nginx反向代理与负载均衡 9品优购部署方案 10Docker简介 11Docker安装与启动 12Docker镜像操作 13Docker容器操作 14部署应用 15备份与迁移

Global site tag (gtag.js) - Google Analytics