本文共 3270 字,大约阅读时间需要 10 分钟。
上篇文章讲了简单队列的使用,这其实就是RMQ给的demo,实际并没有什么用
本篇讲讲工作模式队列,也称之为任务队列
一个生产者发布了多条消息,消费者A可以接受消息,接受消息后该消息就消除,消费者B可以接受其他消息
使用场景,一些数据库操作比较缓慢的话可以分别给多个接口调用,降低压力,或者抢单场景也能考虑,
比如就10个商品,100个消费者来抢单,前10个抢到了后,消息队列就为空了,那么第11个以后的所有消费者都不会抢到
代码示例:
生产者
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 获取到连接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 声明队列11 channel.queueDeclare(QUEUE_NAME, false, false, false, null);12 13 for (int i = 0; i < 50; i++) {14 // 消息内容15 String message = "" + i;16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes());17 System.out.println(" [x] Sent '" + message + "'");18 19 Thread.sleep(i * 10);20 }21 22 channel.close();23 connection.close();24 }25 }
消费者1
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel();10 11 // 声明队列12 channel.queueDeclare(QUEUE_NAME, false, false, false, null);13 14 // 同一时刻服务器只会发一条消息给消费者, 如果注释了就是指生产者平均分配任务给消费者15 channel.basicQos(1);16 17 // 定义队列的消费者18 QueueingConsumer consumer = new QueueingConsumer(channel);19 // 监听队列,手动返回完成 设置fasle代表需要手动返回消息的确认状态20 channel.basicConsume(QUEUE_NAME, false, consumer);21 22 // 获取消息23 while (true) {24 QueueingConsumer.Delivery delivery = consumer.nextDelivery();25 String message = new String(delivery.getBody());26 System.out.println(" [x] Received '" + message + "'");27 // 休眠28 Thread.sleep(10);29 // 手动确认 返回确认状态30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);31 }32 }33 }
消费者2
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 获取到连接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel();10 11 // 声明队列12 channel.queueDeclare(QUEUE_NAME, false, false, false, null);13 14 // 同一时刻服务器只会发一条消息给消费者, 如果注释了就是指生产者平均分配任务给消费者15 channel.basicQos(1);16 17 // 定义队列的消费者18 QueueingConsumer consumer = new QueueingConsumer(channel);19 // 监听队列,手动返回完成状态 设置fasle代表需要手动返回消息的确认状态20 channel.basicConsume(QUEUE_NAME, false, consumer);21 22 // 获取消息23 while (true) {24 QueueingConsumer.Delivery delivery = consumer.nextDelivery();25 String message = new String(delivery.getBody());26 System.out.println(" [x] Received '" + message + "'");27 // 休眠1秒28 Thread.sleep(1000);29 // 手动确认 返回确认状态30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);31 }32 }33 }
转载地址:http://ftqya.baihongyu.com/