Using RabbitMQ for RPC (Remote Procedure Call) and Pub/Sub (Publish/Subscribe) in Go

Using RabbitMQ for RPC (Remote Procedure Call) and Pub/Sub (Publish/Subscribe) in Go

RabbitMQ is a powerful message broker that can be used for various messaging patterns. One of the most interesting and useful patterns is RPC (Remote Procedure Call), which allows a client to execute a procedure on a remote server and receive the result. Another common pattern is Pub/Sub (Publish/Subscribe), which allows messages to be broadcast to multiple consumers. In this blog post, we will explore how to implement both RPC and Pub/Sub using RabbitMQ in Go (Golang).

Introduction

RPC is a protocol that one program can use to request a service from a program located on another computer in a network. RabbitMQ provides a robust mechanism to implement RPC by using request and reply queues. We will set up a simple RPC system where a client sends a number to the server, and the server returns the Fibonacci value of that number.

Pub/Sub is a messaging pattern where messages are published to an exchange and then broadcast to multiple queues. This allows multiple consumers to receive the same message simultaneously.

Prerequisites

  1. A basic understanding of Go programming.
  2. RabbitMQ installed and running. You can follow RabbitMQ installation guide if you haven't installed it yet.
  3. Go installed on your machine. You can download it from the official Go website.

Setting Up the Project

First, let's set up a new Go project:

mkdir rabbitmq-patterns
cd rabbitmq-patterns
go mod init rabbitmq-patterns
go get github.com/streadway/amqp

Writing the RPC Server

The server will receive requests from the client, process them, and send back the results.

Create a file named rpc_server.go:

package main

import (
"log"
"strconv"

"github.com/streadway/amqp"

)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func fibonacci(n int) int {
if n == 0 {
return 0
} else if n == 1
{
return 1
} else
{
return fibonacci(n-1) + fibonacci(n-2)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"rpc_queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments

)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args

)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
for d := range msgs {
n, err := strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")

log.Printf(" [.] fib(%d)", n)
response := fibonacci(n)

err = ch.Publish(
"", // exchange
d.ReplyTo, // routing key
false, // mandatory
false, // immediate

amqp.Publishing{
ContentType: "text/plain",
CorrelationId: d.CorrelationId,
Body: []byte(strconv.Itoa(response)),
})
failOnError(err, "Failed to publish a message")

d.Ack(false)
}
}()

log.Printf(" [x] Awaiting RPC requests")
<-forever
}

Writing the RPC Client

The client will send a number to the server and wait for the result.

Create a file named rpc_client.go:

package main

import (
"log"
"math/rand"
"strconv"
"time"

"github.com/streadway/amqp"

)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments

)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args

)
failOnError(err, "Failed to register a consumer")

corrId := randomString(32)

n := 30 // Number to send to the server
err = ch.Publish(
"", // exchange
"rpc_queue", // routing key
false, // mandatory
false, // immediate

amqp.Publishing{
ContentType: "text/plain",
CorrelationId: corrId,
ReplyTo: q.Name,
Body: []byte(strconv.Itoa(n)),
})
failOnError(err, "Failed to publish a message")

for d := range msgs {
if corrId == d.CorrelationId {
response, err := strconv.Atoi(string(d.Body))
failOnError(err, "Failed to convert body to integer")
log.Printf(" [.] Got %d", response)
break
}
}
}

func randomString(l int) string {
rand.Seed(time.Now().UnixNano())
bytes := make([]byte, l)
for i := range bytes {
bytes[i] = byte(randInt(65, 90))
}
return string(bytes)
}

func randInt(min int, max int) int {
return min + rand.Intn(max-min)
}

Writing the Pub/Sub Publisher

The publisher will send messages to an exchange, which will then broadcast the messages to all bound queues.

Create a file named publisher.go:

package main

import (
"log"
"time"

"github.com/streadway/amqp"

)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments

)
failOnError(err, "Failed to declare an exchange")

for i := 0; i < 10; i++ {
body := "Log message " + strconv.Itoa(i)
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate

amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
time.Sleep(1 * time.Second)
}
}

Writing the Pub/Sub Consumer

The consumer will receive messages from the exchange and process them.

Create a file named consumer.go:

package main

import (
"log"

"github.com/streadway/amqp"

)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err

Read more