Building Chat Service in Golang and Websockets Backed by Redis

Mohammed Hewedy
Level Up Coding
Published in
14 min readMay 24, 2020

--

High-level arch of the solution

In this post, we will see how to use Redis as a pub-sub to implement a horizontally scalable chat service according to this agenda:

💎 What is Pub-sub?
💎 Design Decisions
💎 Let’s Code (the most boring part 🔥, but stay tuned)
💎 Chat Service in action
💎 Test horizontal scalability

What is Pub-sub?

In the post “Implement Job Queue using Redis” of this series, we have seen how to use the list operation and its BLPOP to pop from the list and block until a new element comes. In this post we will see the pub-sub pattern which has first-class support in Redis.

So what is pub-sub?

The pub-sub pattern allows senders of messages, called publishers to publish messages to receivers called subscribers through a channel without knowledge of which subscribers exist — if any. All subscribers exist at the time the message received can receive the message at the same time.

And it is different from message queues which provide an asynchronous communications protocol between the sender and receiver of the message so they do not need to interact with the message queue at the same time. Messages placed onto the queue are stored until the recipient retrieves them.

It is one receipt that retrieves the messages in contract to pub-sub where all active subscribers will receive the messages. and also in pub-sub if there’s no active subscriber, the message will get lost.

Design Decisions

  1. The chat service allows users to subscribe to multiple channels. (could be used to implement direct chatting)
  2. That chat service designed to be horizontally scalable. It is a stateless service (state saved only as long as the WebSocket connection is open, and the user can establish multiple connections to multiple servers). We can horizontally scale the chat service without warring about the stateful nature of the WebSocket protocol by depending on Reids.
  3. Redis client opens a TCP connection per subscription which requires opening too many connections from the chat service to Redis if we open a connection for each channel as user subscribe to.
    To avoid this, whenever the user subscribes to a new channel, I have to cancel the old subscription and subscribe to all user channels at once. (Redis allow subscribe to multiple channels at once)
  4. The chat service registers all connected users in two channels by default “general” and “random” (following slack conventions). The user can subscribe to any arbitrary channel at any time, and only users subscribed to such channels can see the messages.
  5. I used WebSockets to handle the communication between the JavaScript client and the Chat Service. (I like it)
  6. The javascript client sends messages in JSON to the Websocket API which uses the following structure {"command": <0=Subscribe, 1=Unsubscribe or 2=Chat>, "channel": "channel name", "content": "content text"}
  7. The Service provides a couple of REST APIs to get the users and the channels a particular user subscribed to. (would help to build a GUI on top of the service)
  8. The Chat service doesn’t handle user management or authentication aspects.

Let’s Code 🚀🚀🚀

In this (long) section we will go through the most important pieces of the code, however, the full source code is hosted on Github at https://github.com/mhewedy-playground/Chat

High-level arch for the Chat Service

The user component:

Let’s start with the struct definition:

type User struct {
name string
channelsHandler *redis.PubSub
stopListenerChan chan struct{}
listening bool
MessageChan chan redis.Message
}

name: the name of the user

channelsHandler: handler of the Redis subscribe command connection. So each time the user subscribes to a new channel we use this handler to end the current subscription and then start a new subscription to the new channel list (old channels+ the new one) and then keep this reference updated by the value returned from the subscribe function.

// cancel current subscritpion
u.channelsHandler.Unsubscribe()
u.channelsHandler.Close()
// start a new subscritpion on the new channels list (old channels list + the new channel user ask to subscribe to)
pubSub := rdb.Subscribe(channels...)
u.channelsHandler = pubSub

stopListenerChan: Golang channel used to stop the current goroutine that is used to handle the current subscription. it is related to the channelsHandler .

MessageChan: Golang channel that communicates between the Reids subscription goroutine and the Websocket sender goroutine. so whenever a new message arrives on a channel in Redis, we immediately publish it to the web socket connection.

// The Listener Goroutine:
go func() {
u.listening = true
fmt.Println("starting the listener for user:", u.name, "on channels:", channels)
for {
select {
case msg, ok := <-pubSub.Channel():
if !ok {
break
}
u.MessageChan <- *msg
case <-u.stopListenerChan:
break
}
}
}()

Here’s the implementation to the Connect, Subscribeand Unsubscribe methods of user type, it is simple and straightforward:

//Connect connect user to user channels on redis
func Connect(rdb *redis.Client, name string) (*User, error) {
if _, err := rdb.SAdd(usersKey, name).Result(); err != nil {
return nil, err
}
u := &User{
name: name,
stopListenerChan: make(chan struct{}),
MessageChan: make(chan redis.Message),
}
if err := u.connect(rdb); err != nil {
return nil, err
}
return u, nil
}
func (u *User) Subscribe(rdb *redis.Client, channel string) error { userChannelsKey := fmt.Sprintf(userChannelFmt, u.name) if rdb.SIsMember(userChannelsKey, channel).Val() {
return nil
}
if err := rdb.SAdd(userChannelsKey, channel).Err(); err != nil {
return err
}
return u.connect(rdb)
}
func (u *User) Unsubscribe(rdb *redis.Client, channel string) error { userChannelsKey := fmt.Sprintf(userChannelFmt, u.name) if !rdb.SIsMember(userChannelsKey, channel).Val() {
return nil
}
if err := rdb.SRem(userChannelsKey, channel).Err(); err != nil {
return err
}
return u.connect(rdb)
}

Simple and straight forward, we retrieve channels from Redis and keep it up-to-date with subscribe/unsubscribe operations. and then call connect method that subscribes to the list of channels the user has.

Here’s the implementation of the connect method:

func (u *User) connect(rdb *redis.Client) error {

var c []string

c1, err := rdb.SMembers(ChannelsKey).Result()
if err != nil {
return err
}
c = append(c, c1...)

// get all user channels (from DB) and start subscribe
c2, err := rdb.SMembers(fmt.Sprintf(userChannelFmt, u.name)).Result()
if err != nil {
return err
}
c = append(c, c2...)

if len(c) == 0 {
fmt.Println("no channels to connect to for user: ", u.name)
return nil
}

if u.channelsHandler != nil {
if err := u.channelsHandler.Unsubscribe(); err != nil {
return err
}
if err := u.channelsHandler.Close(); err != nil {
return err
}
}
if u.listening {
u.stopListenerChan <- struct{}{}
}

return u.doConnect(rdb, c...)
}

func (u *User) doConnect(rdb *redis.Client, channels ...string) error {
// subscribe all channels in one request
pubSub := rdb.Subscribe(channels...)
// keep channel handler to be used in unsubscribe
u.channelsHandler = pubSub

// The Listener
go func() {
u.listening = true
fmt.Println("starting the listener for user:", u.name, "on channels:", channels)
loop:
for {
select {
case msg, ok := <-pubSub.Channel():
if !ok {
break loop
}
u.MessageChan <- *msg

case <-u.stopListenerChan:
fmt.Println("stopping the listener for user:", u.name)
break loop
}
}
}()
return nil
}

We basically retrieve the cannel list of the user and then clean the old state (the channelsHandler to close any exiting TCP connection and stopListener to close running Goroutines if any )

Then we call doConnect that subscribe to the channels list. The doConnect (which is the most important method so far) do the following:

  1. Subscribe to the list of channels by open one TCP connection for all the channel list in one connection
  2. Capture reference to the TCP connection (u.channelsHandler = pubSub)
  3. Start a goroutine that will handle the messages we receive from Redis on any of the subscribed channels. (again we can stop the goroutine by closing the Golang channel stopListener)
  4. send the message received from Redis to the Golang channel u.MessageChan . This Golang channel is being iterated and any new message is then pushed to the WebSocket clients.

Also, there’s Disconnect method that basically does similar to connect method (by cleaning the resources) but it doesn’t subscribe to a new channel list, plus it closes the MessageChan Golang channel — that communicates between Redis and WebSockets — as it will be no longer needed.

One last method Chat and it is as simple as following:

func Chat(rdb *redis.Client, channel string, content string) error {
return rdb.Publish(channel, content).Err()
}

It simply publishes a message to a channel.

The WebSocket Chat Handle Method:

func ChatWebSocketHandler(w http.ResponseWriter, r *http.Request, rdb *redis.Client) {   conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
handleWSError(err, conn)
return
}
err = onConnect(r, conn, rdb)
if err != nil {
handleWSError(err, conn)
return
}
closeCh := onDisconnect(r, conn, rdb) // on recieve message from redis channels
onChannelMessage(conn, r)
loop:
for {
select {
case <-closeCh:
break loop
default
:
onUserMessage(conn, r, rdb)
}
}
}
  1. We upgrade the connection (using Gorilla Toolkit WebSocket implementation)
  2. Call onConnect that retrieve the username for the URL and call user.Connect method to establish a connection to the channels the user subscribed to and add the user to connectedUsers map.
  3. call onDiscounnect it registers a WebSocket Close Handler that calls user.Disconnect and will remove the user from the connectedUsers map , eventually, it returns a Golang channel that is used to break the WebSocket loop as we will see when we reach the and of this method.
  4. Call onChannelMessage That looks like:
u := connectedUsers[username]go func() {
for m := range u.MessageChan {
msg := msg{
Content: m.Payload,
Channel: m.Channel,
Command: 0,
}
if err := conn.WriteJSON(msg); err != nil {
fmt.Println(err)
}
}
}()

It retrieves the user from connectedUsers map and starts a goroutine that will read from the u.MssageChan that is being written by the goroutine that subscribes to the Reids channels. then it writes the message to the WebSocket connection back to clients.

5. The WebSocket loop:

loop:
for {
select {
case <-closeCh:
break loop
default
:
onUserMessage(conn, r, rdb)
}
}

It basically loops forever until a closeCh (that is returned from the onDisconnect method in #3 above) until then it calls the onUserMessage method that parses the message coming on the WebSocket connection and takes action based on the command type. here’s the implementation:

type msg struct {
Content string `json:"content,omitempty"`
Channel string `json:"channel,omitempty"`
Command int `json:"command,omitempty"`
Err string `json:"err,omitempty"`
}
const (
commandSubscribe = iota
commandUnsubscribe
commandChat
)
func onUserMessage(conn *websocket.Conn, r *http.Request, rdb *redis.Client) { var msg msg if err := conn.ReadJSON(&msg); err != nil {
handleWSError(err, conn)
return
}
username := r.URL.Query()["username"][0]
u := connectedUsers[username]
switch msg.Command {
case commandSubscribe:
if err := u.Subscribe(rdb, msg.Channel); err != nil {
handleWSError(err, conn)
}
case commandUnsubscribe:
if err := u.Unsubscribe(rdb, msg.Channel); err != nil {
handleWSError(err, conn)
}
case commandChat:
if err := user.Chat(rdb, msg.Channel, msg.Content); err != nil {
handleWSError(err, conn)
}
}
}

It basically read the JSON coming from Javascript Websocket client, then retrieves the user from connectedUsers map and then switch over the Command field of the message itself which could be one of the following:

  • commandSubscribe: accept channel name from the user and subscribe the user to the channel (by calling u.Subscribe)
  • commandUnsubscribe: accept channel name from the user and unsubscribe the user to the channel (by calling u.Subscribe)
  • commandChat: this is the command that calls user.Chat to send a text message to a channel.

The API handlers

Besides the user and the Websocket Chat components, we have the last component that represents the API:

r.Path("/chat").Methods("GET").HandlerFunc(api.ChatWebSocketHandler)r.Path("/users").Methods("GET").HandlerFunc(api.UsersHandler)
r.Path("/user/{user}/channels").Methods("GET").HandlerFunc(api.UserChannelsHandler)

We have checked the function ChatWebSocketHandler in the previous section, let’s see the implementation of UserHandler and UserChannelsHandler:

func UserChannelsHandler(w http.ResponseWriter, r *http.Request, rdb *redis.Client) {
username := mux.Vars(r)["user"]
list, err := user.GetChannels(rdb, username)
if err != nil {
handleError(err, w)
return
}
err = json.NewEncoder(w).Encode(list)
if err != nil {
handleError(err, w)
return
}
}func UsersHandler(w http.ResponseWriter, r *http.Request, rdb *redis.Client) { list, err := user.List(rdb)
if err != nil {
handleError(err, w)
return
}
err = json.NewEncoder(w).Encode(list)
if err != nil {
handleError(err, w)
return
}
}

That’s all about the coding part. again you can give a look at the whole implementation at Github https://github.com/mhewedy-playground/Chat

In the next section we will see that Chat Service in action.

Chat Service in Action

Let’s open two chrome windows Side by Side as following so testing would be easy:

You need to start the Golang chat Service first and make sure it connects to Redis Sever, you might find the first post of the series helpful to install and configure Redis inside Ubuntu VM in minutes.

Connect and subscribe:

First, we need to start the Chat Service:

$ go build; ./chat

Let’s start the WebSocket client on each tab of them, on the left tab type:

let socket = new WebSocket("ws://localhost:8080/chat?username=wael");
socket.onmessage = function(event) {
console.log(`[message] Data received from server: ${event.data}`);
};

And on the right tab write:

let socket = new WebSocket("ws://localhost:8080/chat?username=mazen");
socket.onmessage = function(event) {
console.log(`[message] Data received from server: ${event.data}`);
};

Notice the Chat Service log:

connected from: [::1]:55471 user: wael
starting the listener for user: wael on channels: [general random]
connected from: [::1]:55490 user: mazen
starting the listener for user: mazen on channels: [general random]

Let’s notice Redis clients using redis-cli:

127.0.0.1:6379> client list
id=325 addr=127.0.0.1:58944 fd=9 name= age=98745 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=32742 obl=0 oll=0 omem=0 events=r cmd=client
id=852 addr=127.0.0.1:60006 fd=8 name= age=107 idle=104 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=smembers
id=853 addr=127.0.0.1:60008 fd=10 name= age=107 idle=17 flags=P db=0 sub=2 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=854 addr=127.0.0.1:60010 fd=11 name= age=104 idle=14 flags=P db=0 sub=2 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping

Notice we have 4 Clients for Connected to Redis: the first one is the redis-cli itself. the second one is the connection used by Redis commands inside the service (publish, smembers, sadd... etc). the last 2 are a connection for the subscribe function of each client of the two (left tab client and right tab client).

Note, each client has only one open connection to all channels it listens to.

Now let’s add 3 more channels to the left tab client and recheck the command. In the left tab type:

socket.send('{"command": 0, "channel": "new_channel1"}')
socket.send('{"command": 0, "channel": "new_channel2"}')
socket.send('{"command": 0, "channel": "new_channel3"}')

Again, command=0 means subscribe, command=1 unsubscribe and command=2 means send chat message.

Notice the chat service log:

starting the listener for user: wael on channels: [general random new_channel1]
starting the listener for user: wael on channels: [general random new_channel2 new_channel1]
starting the listener for user: wael on channels: [general random new_channel3 new_channel2 new_channel1]

It indicates for use “wael” (the user of the left) is now is listening to total 5 rooms, but let’s check the TCP connections to Redis to make sure we don’t use too much of them:

127.0.0.1:6379> client list
id=325 addr=127.0.0.1:58944 fd=9 name= age=99646 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=32742 obl=0 oll=0 omem=0 events=r cmd=client
id=882 addr=127.0.0.1:60066 fd=8 name= age=99 idle=83 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=smembers
id=884 addr=127.0.0.1:60070 fd=11 name= age=93 idle=3 flags=P db=0 sub=2 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
id=887 addr=127.0.0.1:60076 fd=10 name= age=83 idle=23 flags=P db=0 sub=5 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping

Greate, we still use 4 channels, you might notice the id of the connection has been changed.

Send message:

Now let’s send a message from the left client to “general” channel:

socket.send('{"command": 2, "channel": "general", "content": "some general content"}')

The message will be published to the “general” channel and then it will be received by both clients and got pushed to the WebSocket on both the clients:

Now if we try to push message to one of the channels that the left client has exclusively (e.g. “new_channel1”) for example, it will not appear to the client on the right.

Let’s have a look at Redis data structure:

127.0.0.1:6379> keys *
1) "users"
2) "user:wael:channels"
3) "channels"
127.0.0.1:6379> smembers users
1) "mazen"
2) "wael"
127.0.0.1:6379> smembers channels
1) "general"
2) "random"
127.0.0.1:6379> smembers user:wael:channels
1) "new_channel3"
2) "new_channel2"
3) "new_channel1"

We notice that all data related to our chat service is persisted at Redis which help the chat service to scale horizontally.

We could depend on the atomicity provided by Redis using the INCR command and tag each chat message by a unique ID to ensure the uinquenss on each messages recieved by clients.

Disconnect:

Let’s try to disconnect the client on the right, and check the Redis users set again. on the right tab of chrome issue:

We have an API that lists the users we have:

watch curl -s http://localhost:8080/users

It shows in my case:

Every 2.0s: curl -s http://localhost...  Muhammads-MacBook-Pro.local: Sun May 24 11:21:29 2020["mazen","wael"]

Now let’s go to the right tab of chrome and close the connection:

socket.close()

You will note that the result for the curl watch command changed:

Every 2.0s: curl -s http://localhost...  Muhammads-MacBook-Pro.local: Sun May 24 11:22:16 2020["wael"]

and let’s check Redis:

127.0.0.1:6379> smembers users
1) "wael"

👏👏👏

One handy command I used to debug commands sent to Redis is MONITOR that used to print every single command that the Redis server receives.

Testing Horizontal Scalability

Next, let’s start 2 Chat services and let’s connect each client to one of them:

$ go build; PORT=8080 ./chat & PORT=8081 ./chat &
[1] 51535
[2] 51536

Now let’s open chrome and point the left client to 8080 services and the right client to the 8081 services:

On the Tab on left, paste:

let socket = new WebSocket("ws://localhost:8080/chat?username=wael");
socket.onmessage = function(event) {
console.log(`[message] Data received from server: ${event.data}`);
};

And on the Tab on right, paste:

let socket = new WebSocket("ws://localhost:8081/chat?username=mazen");
socket.onmessage = function(event) {
console.log(`[message] Data received from server: ${event.data}`);
};

On the Tab of the left let’s send a message to “general” channel that both supposed to subscribe to:

socket.send('{"command": 2, "channel": "general", "content": "some general content"}')

And yes, it is working:

👏👏👏 🎉🎉🎉🎉🎉

We have seen How to use Redis with Golang powerful goroutines and channels compiled with WebSocket using the powerful libraries Gorilla Toolkit WebSocket and the Go-Redis to build a horizontally scalable multi-channel chat service.

Please note, the code is not the best nor it is production-grade, it is been written as a PoC.

Again, you can find the source code at Github: https://github.com/mhewedy-playground/Chat

--

--