Creating a minimal RabbitMQ client using Go

Mehrdad Esmaeilpour
Level Up Coding
Published in
5 min readDec 25, 2020

--

Photo by Fotis Fotopoulos on Unsplash

Go is an open source programming language that makes it easy to build simple, reliable, and efficient software. RabbitMQ is an open-source message-broker software that originally implemented the Advanced Message Queuing Protocol and has since been extended with a plug-in architecture to support Streaming Text Oriented Messaging Protocol, MQ Telemetry Transport, and other protocols.

In this tutorial, we are going to create a minimal RabbitMQ client that allows other packages consume or publish messages from or to RabbitMQ. The final source code can be found on my GitHub.

Prerequisites

  • I assume that you have installed Go already. If not, check here.
  • You need to know a little bit about Go modules. We’re going to use modules as our dependency management solution.
  • You need to be somewhat familiar with how RabbitMQ works. We’re not going to cover every aspect and types of consuming and publishing messages.

First steps

First you need to create a directory with the name of your module. I’m going to name it rmq. Then initialize go modules using go mod init command. Now you have to get the main dependency using go get github.com/streadway/amqp.

Now create a main.go file in the root directory of your module. This file is where everything is going to start (and probably end). Your main file should be like this for start:

package rmq

Now let’s create a custom type named RabbitClient . We’re going to add consumer and publisher channels and connection separately in this client.

type RabbitClient struct {
sendConn *amqp.Connection
recConn *amqp.Connection
sendChan *amqp.Channel
recChan *amqp.Channel
}

Connect and create channels

In this part we’re going to add two private methods for our custom type that tries to connect to RabbitMQ and then creates a channel based on connection type (consumer|publisher) and reconnect (try to reconnect if it’s already exists).

The connect method receives two boolean args to know about connection type and reconnect mode. We assume that you already have the information about RabbitMQ service (Username, Password, Host and Port) in a custom type named config.

// Create a connection to rabbitmq
func (rcl *RabbitClient) connect(isRec, reconnect bool) (*amqp.Connection, error) {
if reconnect {
if isRec {
rcl.recConn = nil
} else {
rcl.sendConn = nil
}
}
if isRec && rcl.recConn != nil {
return rcl.recConn, nil
} else if !isRec && rcl.sendConn != nil {
return rcl.sendConn, nil
}
var c string
if config.Username == "" {
c = fmt.Sprintf("amqp://%s:%s/", config.Host, config.Port)
} else {
c = fmt.Sprintf("amqp://%s:%s@%s:%s/", config.Username, config.Password, config.Host, config.Port)
}
conn, err := amqp.Dial(c)
if err != nil {
log.Printf("\r\n--- could not create a conection ---\r\n")
time.Sleep(1 * time.Second)
return nil, err
}
if isRec {
rcl.recConn = conn
return rcl.recConn, nil
} else {
rcl.sendConn = conn
return rcl.sendConn, nil
}
}

Same as the connect method, the channel method receives two boolean args to know about connection type and reconnect mode. This method tries forever to connect to RabbitMQ service and then create a channel based on the connection type.

func (rcl *RabbitClient) channel(isRec, recreate bool) (*amqp.Channel, error) {
if recreate {
if isRec {
rcl.recChan = nil
} else {
rcl.sendChan = nil
}
}
if isRec && rcl.recConn == nil {
rcl.recChan = nil
}
if !isRec && rcl.sendConn == nil {
rcl.recChan = nil
}
if isRec && rcl.recChan != nil {
return rcl.recChan, nil
} else if !isRec && rcl.sendChan != nil {
return rcl.sendChan, nil
}
for {
_, err := rcl.connect(isRec, recreate)
if err == nil {
break
}
}
var err error
if isRec {
rcl.recChan, err = rcl.recConn.Channel()
} else {
rcl.sendChan, err = rcl.sendConn.Channel()
}
if err != nil {
log.Println("--- could not create channel ---")
time.Sleep(1 * time.Second)
return nil, err
}
if isRec {
return rcl.recChan, err
} else {
return rcl.sendChan, err
}
}

Now that we are able to connect and create channels, let’s start to consume and publish messages. We’re going to declare lazy-mode queues that are durable in both consume and publish modes. You can change it to whatever that fits your problem.

Let’s consume something

The Consume method receives two args, one is the queue’s name and the other one is the function that handles the consumed message’s body. We’re going to ack|nack based on the result of this function.

// Consume based on name of the queue
func (rcl *RabbitClient) Consume(n string, f func(interface{}) error) {
for {
for {
_, err := rcl.channel(true, true)
if err == nil {
break
}
}
log.Printf("--- connected to consume '%s' ---\r\n", n)
q, err := rcl.recChan.QueueDeclare(
n,
true,
false,
false,
false,
amqp.Table{"x-queue-mode": "lazy"},
)
if err != nil {
log.Println("--- failed to declare a queue, trying to reconnect ---")
continue
}
connClose := rcl.recConn.NotifyClose(make(chan *amqp.Error))
connBlocked := rcl.recConn.NotifyBlocked(make(chan amqp.Blocking))
chClose := rcl.recChan.NotifyClose(make(chan *amqp.Error))
m, err := rcl.recChan.Consume(
q.Name,
uuid.NewV4().String(),
false,
false,
false,
false,
nil,
)
if err != nil {
log.Println("--- failed to consume from queue, trying again ---")
continue
}
shouldBreak := false
for {
if shouldBreak {
break
}
select {
case _ = <-connBlocked:
log.Println("--- connection blocked ---")
shouldBreak = true
break
case err = <-connClose:
log.Println("--- connection closed ---")
shouldBreak = true
break
case err = <-chClose:
log.Println("--- channel closed ---")
shouldBreak = true
break
case d := <-m:
err := f(d.Body)
if err != nil {
_ = d.Ack(false)
break
}
_ = d.Ack(true)
}
}
}
}

The Consume method handles NotifyClose, NotifyBlocked and NotifyClose from the connection and channel and try to reconnect or recreate them if needed.

Let’s publish something

The Publish method receives three args, one is the queue’s name and the other one is the array of bytes and contains the message’s body.

// Publish an array of bytes to a queue
func (rcl *RabbitClient) Publish(n string, b []byte) {
r := false
for {
for {
_, err := rcl.channel(false, r)
if err == nil {
break
}
}
q, err := rcl.sendChan.QueueDeclare(
n,
true,
false,
false,
false,
amqp.Table{"x-queue-mode": "lazy"},
)
if err != nil {
log.Println("--- failed to declare a queue, trying to resend ---")
r = true
continue
}
err = rcl.sendChan.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
MessageId: uuid.NewV4().String(),
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: b,
})
if err != nil {
log.Println("--- failed to publish to queue, trying to resend ---")
r = true
continue
}
break
}
}

This method handles reconnect or recreation of channel if needed.

Usage

Create an instance of the RabbitClient type and use Consume or Publish method.

var rc rmq.RabbitClientrc.Consume("test-queue", funcName)rc.Publish("test-queue", mBody)

What I didn’t cover

  • Different types of queue declration
  • Using exchanges
  • Unit tests

--

--