Level Up Coding

Coding tutorials and news. The developer homepage gitconnected.com && skilled.dev && levelup.dev

Follow publication

Connecting a service in Golang to a RabbitMQ server

Gopher and RabbitMQ

In the previous article we structured a service in Go so that it can start and finish internal processes. Now that we have a working service, let’s make a RabbitMQ client to handle all RabbitMQ processes inside it. We will use the rabbitmq/amqp091-go library to connect, consume and publish messages. Come with me on this journey!

First of all, it’s important to be familiar with some of the basic RabbitMQ concepts, such as queue, exchange, channel and others. If you know nothing about it, I recommend you read this documentation before continuing.

Where do I set up the RabbitMQ client?

I usually say that Go is package oriented, so a good way to organize the service is by splitting features and responsibilities into packages, thus allowing us to have all RabbitMQ-related code in a single package with a name that makes it clear which are their features and responsibilities, like queuerabbit.

Example of package structure

Imagine there is a bug that causes the message to never reach your service. If you had the structure above, where would you start looking for the bug? Certainly by the package queuerabbit, which combines both queue and rabbit words, referencing RabbitMQ.

It is very simple to ascertain the responsibilities of this package as it makes and keeps connections to RabbitMQ, and also handles the messages.

Connecting to RabbitMQ

The first thing to do is to make a connection. A quick look into the documentation and you find the function conn, err = amqp.Dial(uri). So you create the channels and it’s ready, your connection is done, right? Right?

What happens if there is a problem with the network and the connection is closed? It’s a good practice to be prepared for the worst scenarios.

The function Start(ctx context.Context) creates a goroutine that will always try to connect to the server and if it disconnects for whatever reason, the service will be back online without human interference right when the problem is solved.

It’s vital that the context passed to Start must have a CancelFunc, so that , the context can be canceled and the goroutine responsible for keeping the connection can stop running when the service is going down.

One of the main concerns when creating a service is the resources’ usage. We don’t have infinite CPU and memory, so the parameter prefetchCount, which tells how many messages are read from the queue, is very important. If the value is 0 there is a chance that all the messages will be read from the queue at once and it might burst the service memory limits.

Consuming messages

The rabbitmq/amqp091-go library is very well structured. When you call the function channel.Consume() it returns a channel for which the messages will come and that is perfect for a goroutine to read messages indefinitely.

The challenge is synchronizing the messages’ reading with the service shutdown while avoiding data loss. A key piece of this puzzle is the function channel.Cancel, which stops receiving new messages. It allows the processing of the last received messages before closing the connection to the server.

For each consumer and each message we call the functions wg.Add(1) and wg.Done() from sync.WaitGroup, which allows us to call wg.Wait() before closing the connection when the service is shutting down, so we can successfully read and process all the messages from all active consumers.

The goroutine that consumes the messages also reacts to the canceled context signal by stopping to read new messages and also allowing the last messages to be processed.

Publishing messages

Publishing is very simple. Let’s just use the same sync.WaitGroup as the consumer so when the service is going down it will wait for the message to be sent.

The only problem that may happen is cyclic reference. In other words, from the queue package you could call a function in another package to which, in its turn, call the publishing function from inside the queue package. To avoid this issue, you can create a publisher interface and a struct that implements this interface.

Closing the connection

Now that the service is running and handling messages, the time has come to terminate it. We begin by calling the function cancel() that was created on the initialization of the service, therefore the goroutine keeping the connection opened can end and all the goroutines that are consuming messages stops handling new ones, deals with the remaining messages and also finishes smoothly. Once everything is done we can close the channels and close the connection to the server.

The purpose of the first goroutine created in Close(ctx context.Context) is to wait for all the messages to be handled accordingly. The second one waits for the signal from the first or the context’s timeout and then closes the channels and the connection. The function returns a channel to notify that everything is finished. With that, the main thread can finish other processes and wait for the signal that everything was successfully finished.

Simplifying the service’s structure

Even with all this explanation and codes, it can still be hard to consolidate everything in a single package. That’s why I created a library that connects, consumes, publishes and closes the connections, making it very simple to create a new package. Everything can be found at github by clicking here.

I also created a sample service as an example using this library, it is also at github.

Conclusion

RabbitMQ is an extremely powerful tool for asynchronous communication between microservices and structuring a package that interacts with it is very challenging. It’s important to have in mind that you must know how to finish everything that you start it to avoid losing messages. This brings consistency and robustness to the system.

I hope this tutorial has been useful. Leave a comment in the section below.

Written by Gabriel Beletti

Backend developer | Tech Lead | Engineering Manager | Golang | Kubernetes | Microservices

Responses (1)

Write a response