ActiveMQ支持的协议
ActiveMQ支持多种协议传输和传输方式,允许客户端使用多种协议连接.
ActiveMQ支持的协议: AUTO、OpenWire、AMQP、Stomp、MQTT等
在${ACTIVE_HOME}/conf/activemq.xml
中, 通过配置<transportConnectors>
就可以使用多种传输方式(==注意配置文件中可能会用到转义字符串, 比如&要用&来表示==)
ActiveMQ支持的基础传输方式有: VM、TCP、SSL、UDP、Peer、Multicast、HTTP(S)等
由于TCP的稳定性, 它也是ActiveMQ中最常用的一种传输方式. 在默认的设置中, TCP连接的端口为61616
OpenWire协议
OpenWire协议是Apache的一种跨语言的协议,允许从不同的语言和平台访问ActiveMQ
默认就是使用这种协议, 功能很全面
OpenWire支持TCP、SSL、NIO、UDP、VM等传输方式,但URL只配置传输方式时,默认就是使用OpenWire协议
MQTT协议
全称Message Queuing Telemetry Transport,即消息队列遥测传输,由IBM开发, 已成为物联网通信的标准
它结构简单,相比其他的协议更加轻量级
发布-订阅模型
MQTT协议使用发布-订阅模型
提供者发布消息到主题topic上, 只要订阅了这个topic的消费者,都能收到这条消息
消费者无法收到启动前topic上的消息
MQTT中的服务质量(QoS)
服务质量(QoS)级别 是一种关于发送者和接受者之间信息传递的保证协议
MQTT支持三种QoS
至多一次(0)客户端只发布一次消息到服务器
至少一次(1)客户端发送消息,直到服务器返回成功
只有一次(2)在(1)的前提下, 客户端继续发送, 客户端查看是否存在, 有就删除
QoS是的在不可靠的网络下进行通信变得更加简单,因为即使是在非常不可靠的网络下,协议也可以掌控是否需要重发消息, 并保证消息到达
ActiveMQ中使用MQTT协议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| public class MqttProducer { private static int qos = 1; private static String broker = "tcp://activemq.tony.com:1883"; private static String userName = "admin"; private static String passWord = "admin";
private static MqttClient connect(String clientId, String userName,String password) throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); connOpts.setConnectionTimeout(10); connOpts.setKeepAliveInterval(20); MqttClient mqttClient = new MqttClient(broker, clientId, persistence); mqttClient.setCallback(new PushCallback("test")); mqttClient.connect(connOpts); return mqttClient; }
private static void pub(MqttClient sampleClient, String msg, String topic) throws Exception { MqttMessage message = new MqttMessage(msg.getBytes()); message.setQos(qos); message.setRetained(false); sampleClient.publish(topic, message); }
private static void publish(String str, String clientId, String topic) throws Exception { MqttClient mqttClient = connect(clientId, userName, passWord); if (mqttClient != null) { pub(mqttClient, str, topic); System.out.println("pub-->" + str); } if (mqttClient != null) { mqttClient.disconnect(); } }
public static void main(String[] args) throws Exception { publish("message content", "producer-client-id-0", "x/y/z"); } }
class PushCallback implements MqttCallback { private String threadId;
public PushCallback(String threadId) { this.threadId = threadId; }
public void connectionLost(Throwable cause) { cause.printStackTrace(); }
public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("服务器是否正确接收---------" + token.isComplete()); }
public void messageArrived(String topic, MqttMessage message) throws Exception { String msg = new String(message.getPayload()); System.out.println(threadId + " " + msg); } }
public class MqttConsumer { private static int qos = 2; private static String broker = "tcp://activemq.tony.com:1883"; private static String userName = "admin"; private static String passWord = "admin";
private static MqttClient connect(String clientId) throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(false); connOpts.setUserName(userName); connOpts.setPassword(passWord.toCharArray()); connOpts.setConnectionTimeout(10); connOpts.setKeepAliveInterval(20); MqttClient mqttClient = new MqttClient(broker, clientId, persistence); mqttClient.connect(connOpts); return mqttClient;
}
public static void sub(MqttClient mqttClient, String topic) throws MqttException { int[] Qos = {qos}; String[] topics = {topic}; mqttClient.subscribe(topics, Qos, new IMqttMessageListener[]{(s, mqttMessage) -> { System.out.println("收到新消息" + s + " > " + mqttMessage.toString()); }}); }
private static void runsub(String clientId, String topic) throws MqttException { MqttClient mqttClient = connect(clientId); if (mqttClient != null) { sub(mqttClient, topic); } }
public static void main(String[] args) throws MqttException { runsub("consumer-client-id-1", "x/y/z"); }
}
|
AUTO协议
AUTO自动检测协议,可以自动检测ActiveMQ支持的所有协议, 允许使用各种协议的客户端,使用同一个传输
Stomp协议
可以使用webSocket传输协议