RabbitMQ入门
RabbitMQ是一个开源的机遇AMQP协议的实现,服务端用Erlang语言编写,支持多种客户端,用于在分布式系统中存储转发消息
AMQP协议
RabbitMQ的安装
- 官网安装rabbitmq,注意需要安装erlang对应的版本
- 安装完,可以直接启动服务
systemctl start rabbitmq-server
Web管理平台
开启插件rabbitmq-plugins enable rabbitmq_management
创建新用户,配置权限
rabbitmqctl add_user admin admin
分配操作权限
rabbitmqctl set_user_tags admin administrator
分配资源权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
开启防火墙
java中的使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
public class Producer {
public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.100.242"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin");
Connection connection = null; Channel channel = null;
try { connection = factory.newConnection("生产者");
channel = connection.createChannel();
channel.queueDeclare("queue1", false, false, false, null);
String message = "Hello World!"; channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println("消息已发送!");
} catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class Consumer {
public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.100.242"); factory.setUsername("admin"); factory.setPassword("admin");
Connection connection = null; Channel channel = null;
try { connection = factory.newConnection("消费者");
channel = connection.createChannel();
channel.queueDeclare("queue1", false, false, false, null);
DeliverCallback callback = new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息:" + new String(message.getBody(), "UTF-8")); } }; channel.basicConsume("queue1", true, callback, new CancelCallback() { public void handle(String consumerTag) throws IOException { } });
System.out.println("开始接收消息"); System.in.read();
} catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
|
Anything can go right will go right