Mastering RabbitMQ with Spring Boot: Setting Up Beans and Implementing Dead Letter Queues (Part 2)
Welcome in the second blog post in my short series about RabbitMQ and Spring Boot using AMQP starter. If you missed the first one you can find it here. This time I will cover a few more topics that could be also interesting for each developer implementing a solution based on this stack. I’ll begin with creating the configuration class for queues and exchanges using built-in beans, then an introduction of the dead letter queue – quite an important thing in production environments.
Whole project can be found on my GitHub – https://github.com/szymon-sawicki/rabbitmq-products-app.
List of contents
RabbitMQ configuration using beans
In the last post, I mentioned that usually at production environment RabbitMQ configuration is being done by devops or admins. But for learning purposes, or using Testcontainers it could save a lot of time. Of course, this configuration also can be done with Dockerfile and Docker Compose, if you’re interested check this post, but it’s creating some overhead and for this simple project we don’t need this.
Creating RabbitMQ queue in Spring Boot:
At first we need queues: for this task we create bean of type Queue. Name represents our queue for clarity. We can create a bean in two ways:
1 2 3 4 5 6 |
// creating bean using constructor @Bean Queue qBooks() { return new Queue("q.books", true); } |
1 2 3 4 5 6 |
// creating bean using builder @Bean Queue qClothing() { return QueueBuilder.durable("q.clothing").build(); } |
Creating RabbitMQ exchange in Spring Boot:
We have 4 types of echanges in RabbitMQ, each of them is represented by type of bean in Spring Boot:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// Exchanges @Bean DirectExchange xProductsDirect() { return new DirectExchange("x.products-direct", true, false); } @Bean FanoutExchange xProductsFanout() { return new FanoutExchange("x.products-fanout", true, false); } @Bean HeadersExchange xProductsHeader() { return new HeadersExchange("x.products-header", true, false); } @Bean TopicExchange xProductsTopic() { return new TopicExchange("x.products-topic", true, false); } |
Creating RabbitMQ bindings in Spring Boot:
Fanout bindings:
In each of ceases I’ll use BindingBuilder which is more readable than constructor. Method bind() expects objects of type queue as argument and to() where we passing exchange object. The last method – with() introuduces routing key
1 2 3 4 |
@Bean Binding bindingDirectElectronics() { return BindingBuilder.bind(qElectronics()).to(xProductsDirect()).with("ELECTRONICS"); } |
Topic bindings:
1 2 3 4 |
@Bean Binding bindingTopicBooks() { return BindingBuilder.bind(qBooks()).to(xProductsTopic()).with("BOOKS.#"); } |
Fanout bindings:
Fanout exchange don’t need routing key.
1 2 3 4 |
@Bean Binding bindingFanoutWarehouse() { return BindingBuilder.bind(qWarehouse()).to(xProductsFanout()); } |
Header bindings
For defining of arguments by exchange of type header we need to create hashmap and put arguments as its entries. If you need some arguments with the same value – you need create separate binding for each of them to avoid overriding of entries in map.
1 2 3 4 5 6 7 8 9 10 |
@Bean Binding bindingHeaderExpensiveClothings() { var headers = new HashMap<String,Object>(); headers.put("x-match","all"); headers.put("category","CLOTHINGS"); headers.put("price-range","high"); return BindingBuilder.bind(qExpensiveClothings()).to(xProductsHeader()).whereAll(headers).match(); } |
Dead letter queue in RabbitMQ and Spring Boot
Basically, in RabbitMQ, a dead letter queue is the queue that stores messages that cannot be routed to their destination. These could be messages that have been rejected due to syntax errors, or messages that a consumer has failed to process after a number of predetermined attempts. Instead of trashing these messages, RabbitMQ can be configured to store these undeliverable messages in a dead letter exchange onto a dead letter queue for later inspection or reprocessing.
In Spring Boot, to configure a dead letter queue, you can set up your queue by designating a dead letter exchange and a routing key. Here is the example of how to create a queue with a dead letter exchange:
1 2 3 4 5 6 7 |
@Bean Queue qWarehouse() { return QueueBuilder.durable("q.warehouse") .withArgument("x-dead-letter-exchange","x.warehouse-failures") .withArgument("x-dead-letter-routing-key","fall-back") .build(); } |
In the above example configuration, the q.warehouse queue is set up to route undeliverable messages to the x.warehouse-failures exchange with routing key fallback.
During testing you can force message rejection in a listener consuming messages from the q.warehouse queue. Whenever a product name contains a string “bad product”, a message is rejected and routed to dead letter queue:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@RabbitListener(queues = "q.warehouse") public void getWarehouseProducts(String jsonMessage) { try { var product = new ObjectMapper().readValue(jsonMessage, Product.class); if(product.getName().contains("bad product")) throw new RuntimeException("Intentionally rejecting message - product have wrong name: " + product); log.info("Warehouse - product received: " + product); warehouseProducts.add(product); } catch (Exception e) { throw new AmqpRejectAndDontRequeueException("Failed to process message", e); } } |
By throwing an AmqpRejectAndDontRequeueException
, the message is not requeued and is instead routed to the dead letter queue defined earlier.
Summary
In this post, we have seen how to configure RabbitMQ by using Spring Boot’s AMQP starter, creating queues, exchanges, and performing bindings with Spring beans. We have also seen why dead letter queues are important in a production environment, and how to configure a dead letter queue to handle undeliverable messages. This setup is important for robust, fault-tolerant messaging systems, where the developer can look and reprocess those messages that could not be delivered the first time. You can have a look at the code from my GitHub for a complete view of the project.