ActiveMQ入门
ActiveMQ是Apache出品, 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现
尽管JMS已经出台很久, 但在J2EE中仍然扮演者重要角色
ActiveMQ特性
- 支持多种编程语言
- 支持多种传输协议
- 支持多种持久化方式(文件系统/数据库)
ActiveMQ的安装
- 从官网下载ActiveMQ
- 利用Xftp将文件传到服务器
- 解压到
/var
目录下
- 修改文件名为
activemq
ActiveMQ启动
在activemq文件下使用命令./bin/activemq start
后台启动ActiveMQ
但最好是将ActiveMQ作为一个服务来启动,这样可以使activemq在系统启动时自动启动
创建ActiveMQ服务
使用vim创建一个服务文件
vi /usr/lib/systemd/system/activemq.service
输入内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| [Unit] Description=ActiveMQ service After=network.target
[Service] Type=forking ExecStart=/var/activemq/bin/activemq start ExecStop=/var/activemq/bin/activemq stop User=root Group=root Restart=always RestartSec=9 StandardOutput=syslog StandardError=syslog SyslogIdentifier=activemq
[Install] WantedBy=multi-user.target
|
修改/var/activemq/bin/env
文件
将JAVA_HOME的注释删除,然后填入系统jdk的安装路径
启动ActiveMQ服务
使用systemctl start activemq
来启动服务
查看服务状态
使用命令systemctl status activemq
设置开机自动启动
ln -s /usr/lib/systemd/system/activemq.service/etc/systemd/system/multi-user.target.wants/activemq.service
systemctl enable activemq
在防火墙添加ActiveMQ的Web管理端口和通讯端口
1 2 3 4
| #Web管理端口默认为8161,通讯端口默认为61616 ufw allow 8161 ufw allow 61616 # 部分服务器需要在对应的服务提供商管理页面修改出入站规则
|
浏览器访问http://服务IP:8161/admin
进入管理平台
账户默认:admin
密码默认:admin
Web管理平台配置
ActiveMQ的Web管理平台是基于jetty运行,因此在/var/activemq/conf目录可以查看jetty的配置文件
在/var/activemq/conf/jetty.xml
文件中,可以进行修改端口,密码等内容
使用ActiveMQ
在Java中使用
1 2 3 4 5
| <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.8</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
public class Producer { public static void main(String[] args) { new ProducerThread("tcp://119.3.218.159:61616", "queue1").start(); }
static class ProducerThread extends Thread { String brokerUrl; String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) { this.brokerUrl = brokerUrl; this.destinationUrl = destinationUrl; }
@Override public void run() { ActiveMQConnectionFactory connectionFactory; Connection conn; Session session;
try { connectionFactory = new ActiveMQConnectionFactory(brokerUrl); conn = connectionFactory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationUrl); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); String text = "Hello world!"; TextMessage message = session.createTextMessage(text); for (int i = 0; i < 1; i++) { producer.send(message); } session.close(); conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
public class Consumer { public static void main(String[] args) { new ConsumerThread("tcp://119.3.218.159:61616", "queue1").start(); new ConsumerThread("tcp://119.3.218.159:61616", "queue1").start(); }
static class ConsumerThread extends Thread {
String brokerUrl; String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) { this.brokerUrl = brokerUrl; this.destinationUrl = destinationUrl; }
@Override public void run() { ActiveMQConnectionFactory connectionFactory; Connection conn; Session session; MessageConsumer consumer;
try { connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl); conn = connectionFactory.createConnection(); conn.start(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(destinationUrl);
consumer = session.createConsumer(destination);
Message message = consumer.receive(); if (message instanceof TextMessage) { System.out.println("收到文本消息:" + ((TextMessage) message).getText()); } else { System.out.println(message); }
consumer.close(); session.close(); conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } }
|
在Spring中使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.1.3.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-broker</artifactId> <version>5.15.8</version> <exclusions> <exclusion> <groupId>geronimo-jms_1.1_spec</groupId> <artifactId>org.apache.geronimo.specs</artifactId> </exclusion> </exclusions> </dependency>
|
web控制台
activemq支持web控制台
http://host:8161/admin
默认账号admin,密码admin
持久化
ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化,消息的存储逻辑都是一致的
- Queue(消息队列)类型的持久化机制
- Topic(消息订阅)类型的持久化机制
持久化机制
- JDBC: 存入数据库,方便管理,性能低
- AMQ: 基于文件的存储方式,写入速度快,且易于恢复,但是建索引时间长
- KahaDB: 默认方式,相比AMQ恢复更快,并且占用数据量更少
- LevelDB: 谷歌开发的持久化高性能类库.
Anything can go right will go right