Tuesday, 13 March 2018

RabbitMQ: How to test a queue exists

When a RabbitMQ cluster exists, queue may be replicated only on some nodes and in case those nodes are down the queue won't work even if there are other nodes up and running.

By sending a message to  RabbitMQ we can see if Exchange node is up or not.
Using a Spring RabbitTemplate we can use:

convertAndSend(java.lang.Object object)
Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key.

If RabbitMQ cluster is down, this method will throw an AmpqException.

We can think to send a message to a specific queue by using:

convertAndSend(java.lang.String routingKey, java.lang.Object object)
Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.

But if the queue name specified by routingKey does not exist, the method will not throw  AmpqException as in previous case, but will end silently.

To actually test if a queue exists (up & running) there is a way also used by Spring in RabbitAdminIntegrationTest:

/**
* Verify that a queue exists using the native Rabbit API to bypass all the connection and channel caching and callbacks in Spring AMQP.
* * @param queue The queue to verify * @return True if the queue exists */ private boolean queueExists(final Queue queue) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(BrokerTestUtils.getPort()); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); try { DeclareOk result = channel.queueDeclarePassive(queue.getName()); return result != null; } catch (IOException e) { return e.getCause().getMessage().contains("RESOURCE_LOCKED"); } finally { connection.close(); } }

To replicate a queue, whose name starts with ha, to all nodes in the cluster we can use following command: 

rabbitmqctl set_policy ha-all "^ha\." '{"ha-mode":"all"}'
Starting from RabbitMQ 3.0 this is the only way, and programmatic definition of policies in xml files is ignored.